2014-07-12 31 views
14

multiprocessing.Pool nesnesini kullandığınızı ve kurucunun initializer ayarını kullanarak, daha sonra genel ad alanında bir kaynak oluşturan bir başlatıcı işlevini ilettiğinizi varsayın. Kaynağın bir içerik yöneticisi olduğunu varsayalım. Sürecin ömrü boyunca yaşamak zorunda olduğu sürece bağlamda yönetilen kaynağın yaşam döngüsünü nasıl ele alırsınız, ama sonunda düzgün bir şekilde temizlenmelidir?İçerik yöneticileri ve çok işlemcili havuzlar

Şimdiye kadar, ben biraz böyle bir şey var: Burada andan itibaren

resource_cm = None 
resource = None 


def _worker_init(args): 
    global resource 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 

, havuz süreçleri kaynak kullanabilirsiniz. Çok uzak çok iyi. Ancak temizleme işlemi biraz daha zorlayıcıdır, çünkü multiprocessing.Pool sınıfı destructor veya deinitializer argümanını sağlamamaktadır.

Fikirlerimden biri, atexit modülünü kullanmak ve temizleyiciyi başlatıcıya kaydettirmektir. Bunun gibisi:

Bu iyi bir yaklaşım mıdır? Bunu yapmanın daha kolay bir yolu var mı?

DÜZENLEME: atexit çalışmıyor gibi görünüyor. En azından yukarıda kullandığım şekilde değil, şu an itibariyle bu problem için hala bir çözümüm yok.

cevap

21

İlk olarak, bu gerçekten harika bir soru! multiprocessing kodunda biraz etrafına kazma sonra ben bunu yapmanın bir yolunu bulduğumu düşünüyorum:

Bir multiprocessing.Pool başlattığınızda, içten Pool nesne havuzu her üyesi için bir multiprocessing.Process nesnesi oluşturur.

def _bootstrap(self): 
    from . import util 
    global _current_process 
    try: 
     # ... (stuff we don't care about) 
     util._finalizer_registry.clear() 
     util._run_after_forkers() 
     util.info('child process calling self.run()') 
     try: 
      self.run() 
      exitcode = 0 
     finally: 
      util._exit_function() 
     # ... (more stuff we don't care about) 

run yöntem aslında target Eğer Process nesneyi verdi çalışır şudur: o alt süreçler başlatırken, bunlar şöyle bir _bootstrap fonksiyonunu çağırın. İş öğelerinin bir iç sıraya gelmesini bekleyen, uzun süre çalışan bir döngüye sahip bir yöntem olan bir Pool işlemi için. Bizim için gerçekten ilginç olan şey,self.run'dan sonra oldu: util._exit_function() çağrılıyor.

Sonradan anlaşıldı ki, bu işlevi aradığınızı gibi bir çok geliyor bazı temiz yukarı yapar: Aslında

def _run_finalizers(minpriority=None): 
    ''' 
    Run all finalizers whose exit priority is not None and at least minpriority 

    Finalizers with highest priority are called first; finalizers with 
    the same priority will be called in reverse order of creation. 
    ''' 

yöntemi: İşte

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 
        active_children=active_children, 
        current_process=current_process): 
    # NB: we hold on to references to functions in the arglist due to the 
    # situation described below, where this function is called after this 
    # module's globals are destroyed. 

    global _exiting 

    info('process shutting down') 
    debug('running all "atexit" finalizers with priority >= 0') # Very interesting! 
    _run_finalizers(0) 

_run_finalizers ait docstring'ini var finalizer geri aramaların bir listesi aracılığıyla çalışır ve bunları yürütür:

items = [x for x in _finalizer_registry.items() if f(x)] 
items.sort(reverse=True) 

for key, finalizer in items: 
    sub_debug('calling %s', finalizer) 
    try: 
     finalizer() 
    except Exception: 
     import traceback 
     traceback.print_exc() 

Perf vb. Peki _finalizer_registry'a nasıl gireceğiz?kayıt defterine bir geri arama ekleyerek sorumludur multiprocessing.util yılında Finalize denilen belgesiz nesne vardır:

import multiprocessing 
from multiprocessing.util import Finalize 

resource_cm = None 
resource = None 

class Resource(object): 
    def __init__(self, args): 
     self.args = args 

    def __enter__(self): 
     print("in __enter__ of %s" % multiprocessing.current_process()) 
     return self 

    def __exit__(self, *args, **kwargs): 
     print("in __exit__ of %s" % multiprocessing.current_process()) 

def open_resource(args): 
    return Resource(args) 

def _worker_init(args): 
    global resource 
    print("calling init") 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 
    # Register a finalizer 
    Finalize(resource, resource.__exit__, exitpriority=16) 

def hi(*args): 
    print("we're in the worker") 

if __name__ == "__main__": 
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) 
    pool.map(hi, range(pool._processes)) 
    pool.close() 
    pool.join() 

Çıktı:

calling init 
in __enter__ of <Process(PoolWorker-1, started daemon)> 
calling init 
calling init 
in __enter__ of <Process(PoolWorker-2, started daemon)> 
in __enter__ of <Process(PoolWorker-3, started daemon)> 
calling init 
in __enter__ of <Process(PoolWorker-4, started daemon)> 
we're in the worker 
we're in the worker 
we're in the worker 
we're in the worker 
in __exit__ of <Process(PoolWorker-1, started daemon)> 
in __exit__ of <Process(PoolWorker-2, started daemon)> 
in __exit__ of <Process(PoolWorker-3, started daemon)> 
in __exit__ of <Process(PoolWorker-4, started daemon)> 

class Finalize(object): 
    ''' 
    Class which supports object finalization using weakrefs 
    ''' 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 
     assert exitpriority is None or type(exitpriority) is int 

     if obj is not None: 
      self._weakref = weakref.ref(obj, self) 
     else: 
      assert exitpriority is not None 

     self._callback = callback 
     self._args = args 
     self._kwargs = kwargs or {} 
     self._key = (exitpriority, _finalizer_counter.next()) 
     self._pid = os.getpid() 

     _finalizer_registry[self._key] = self # That's what we're looking for! 

Ok, bu nedenle bir örnek haline hep birlikte koyarak

__exit__ görebildiğiniz kadarıyla tüm çalışanlarımıza join() havuzunu çağırıyoruz.

İlgili konular