2012-12-20 27 views
6

Çok büyük bir görevi bölmek için çoklu işlem modülünü kullanıyorum. Çoğunlukla çalışıyor, ama tasarımımla bariz bir şey eksik olmalıyım, çünkü bu şekilde, tüm verilerin işlenip işlenmediğini etkin bir şekilde söylemem çok zor.Çok işlemcili - üretici/tüketici dizaynı

Çalıştırdığım iki ayrı görevim var; diğerini besleyen Sanırım bu bir üretici/tüketici sorunudur. Üreticilerin kuyruğu doldurduğu ve tüketicilerin kuyruğu okuduğu ve işlem yaptığı tüm süreçler arasında paylaşılan bir Kuyruk kullanıyorum. Sorun, sonlu bir miktar veri bulunmasıdır, bu nedenle bir noktada herkesin tüm verilerin işlendiğini bilmesi gerekir, böylece sistem düzgün bir şekilde kapanabilir.

map_async() işlevini kullanmak mantıklı görünebilir, ancak üreticiler sıraya dolduğundan, ön taraftaki tüm öğeleri bilmiyorum, bu yüzden bir süre döngüsüne girmem gerekiyor ve apply_async() öğesini kullanın ve her şeyin bir zaman aşımı ile ne zaman yapıldığını algılamaya çalışın ... çirkin.

Bariz bir şeyi kaçırıyormuş gibi hissediyorum. Bu nasıl daha iyi tasarlanabilir? bu sevimsiz alır nerede

PRODCUER

class ProducerProcess(multiprocessing.Process): 
    def __init__(self, item, consumer_queue): 
     self.item = item 
     self.consumer_queue = consumer_queue 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     for record in get_records_for_item(self.item): # this takes time 
      self.consumer_queue.put(record) 

def start_producer_processes(producer_queue, consumer_queue, max_running): 
    running = [] 

    while not producer_queue.empty(): 
     running = [r for r in running if r.is_alive()] 
     if len(running) < max_running: 
      producer_item = producer_queue.get() 
      p = ProducerProcess(producer_item, consumer_queue) 
      p.start() 
      running.append(p) 
     time.sleep(1) 

İşte

if __name__ == "__main__": 
    manager = multiprocessing.Manager() 
    consumer_queue = manager.Queue(1024*1024) 
    producer_queue = manager.Queue() 

    producer_items = xrange(0,10) 

    for item in producer_items: 
     producer_queue.put(item) 

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) 
    p.start() 

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

def process_consumer_chunk(queue, chunksize=10000): 
    for i in xrange(0, chunksize): 
     try: 
      # don't wait too long for an item 
      # if new records don't arrive in 10 seconds, process what you have 
      # and let the next process pick up more items. 

      record = queue.get(True, 10) 
     except Queue.Empty:     
      break 

     do_stuff_with_record(record) 

ANA TÜKETİCİ olduğunu. Harita kullanamıyorum çünkü tüketilecek liste aynı anda dolduruluyor. Bu yüzden bir süre döngüsüne girip zaman aşımını tespit etmeye çalışmalıyım. Üreticiler hala bunu doldurmaya çalışırken tüketici_devi boş olabilir, bu yüzden boş bir kuyruğu bu yüzden bir çıkışın üzerinde algılayamam.

timed_out = False 
    timeout= 1800 
    while 1: 
     try: 
      result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,)) 
      if timed_out: 
       timed_out = False 

     except Queue.Empty: 
      if timed_out: 
       break 

      timed_out = True 
      time.sleep(timeout) 
     time.sleep(1) 

    consumer_queue.join() 
    consumer_pool.close() 
    consumer_pool.join() 

belki I) (get ana iş parçacığı kayıtları ve kuyruğu geçme yerine tüketici içine bu geçebileceği düşünülmektedir, ama ben aynı sorunla bu şekilde sona düşünüyorum. Hala bir süre çalışmalıyım ve apply_async() kullanın. Herhangi bir öneri için şimdiden teşekkür ederiz!

cevap

2

İşe sonuna sinyal bir manager.Event kullanabilirsiniz. Bu etkinlik tüm süreçleriniz arasında paylaşılabilir ve daha sonra bunu ana işleminizden bildirdiğinizde, diğer çalışanlar daha sonra hassas bir şekilde kapanabilir.

Böylece, müşterileriniz, ayarlandıktan sonra etkinliğin ayarlanmasını ve işlenmesini bekler. Bu bayrak size yapımcı ipliklerle ve bu bütün, daha sonra tüketici parçacığı üzerinde katılabilir tamamlamak edilir bir join yapabilirsiniz ayarlamak için zaman

belirlemek.

+0

Bunun işe yarayacağını düşünüyorum. Teşekkür ederim! Join() açıklamanızdan nasıl çalıştığından emin değilim, ama sanırım bir yol buldum. Etkinliği start_producer_process() sürecine aktarıyorum ve tüm üreticilerin tüketici_koruma ekledikten sonra onu() ayarlayın. Bu noktada (ana iş parçacığına dönüş), eğer tüketim_sayısı boş kalırsa, bu her şeyin işlendiği anlamına gelir, böylece while döngüsünden güvenli bir şekilde ayrılabilirim. – user1914881

+0

Kafa karıştırıcı kısım için özür dilerim, katılım ana iş parçacığında olurdu, böylece üreticiniz bittikten ve tüketiciler işlerini yapmaya yeni başladıktan sonra programdan çıkmayacaksınız. – sean

0

ayrık olay benzetimini yapmak için çok işlem/iş parçacığı yerine SimPy'u şiddetle tavsiye ediyorum.

İlgili konular