Çok büyük bir görevi bölmek için çoklu işlem modülünü kullanıyorum. Çoğunlukla çalışıyor, ama tasarımımla bariz bir şey eksik olmalıyım, çünkü bu şekilde, tüm verilerin işlenip işlenmediğini etkin bir şekilde söylemem çok zor.Çok işlemcili - üretici/tüketici dizaynı
Çalıştırdığım iki ayrı görevim var; diğerini besleyen Sanırım bu bir üretici/tüketici sorunudur. Üreticilerin kuyruğu doldurduğu ve tüketicilerin kuyruğu okuduğu ve işlem yaptığı tüm süreçler arasında paylaşılan bir Kuyruk kullanıyorum. Sorun, sonlu bir miktar veri bulunmasıdır, bu nedenle bir noktada herkesin tüm verilerin işlendiğini bilmesi gerekir, böylece sistem düzgün bir şekilde kapanabilir.
map_async() işlevini kullanmak mantıklı görünebilir, ancak üreticiler sıraya dolduğundan, ön taraftaki tüm öğeleri bilmiyorum, bu yüzden bir süre döngüsüne girmem gerekiyor ve apply_async() öğesini kullanın ve her şeyin bir zaman aşımı ile ne zaman yapıldığını algılamaya çalışın ... çirkin.
Bariz bir şeyi kaçırıyormuş gibi hissediyorum. Bu nasıl daha iyi tasarlanabilir? bu sevimsiz alır nerede
PRODCUER
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
İşte
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
ANA TÜKETİCİ olduğunu. Harita kullanamıyorum çünkü tüketilecek liste aynı anda dolduruluyor. Bu yüzden bir süre döngüsüne girip zaman aşımını tespit etmeye çalışmalıyım. Üreticiler hala bunu doldurmaya çalışırken tüketici_devi boş olabilir, bu yüzden boş bir kuyruğu bu yüzden bir çıkışın üzerinde algılayamam.
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
belki I) (get ana iş parçacığı kayıtları ve kuyruğu geçme yerine tüketici içine bu geçebileceği düşünülmektedir, ama ben aynı sorunla bu şekilde sona düşünüyorum. Hala bir süre çalışmalıyım ve apply_async() kullanın. Herhangi bir öneri için şimdiden teşekkür ederiz!
Bunun işe yarayacağını düşünüyorum. Teşekkür ederim! Join() açıklamanızdan nasıl çalıştığından emin değilim, ama sanırım bir yol buldum. Etkinliği start_producer_process() sürecine aktarıyorum ve tüm üreticilerin tüketici_koruma ekledikten sonra onu() ayarlayın. Bu noktada (ana iş parçacığına dönüş), eğer tüketim_sayısı boş kalırsa, bu her şeyin işlendiği anlamına gelir, böylece while döngüsünden güvenli bir şekilde ayrılabilirim. – user1914881
Kafa karıştırıcı kısım için özür dilerim, katılım ana iş parçacığında olurdu, böylece üreticiniz bittikten ve tüketiciler işlerini yapmaya yeni başladıktan sonra programdan çıkmayacaksınız. – sean