2015-04-07 14 views
8

Ben bu şekilde pitonun çoklu işlem paketini kullanmaya çalışıyorum: Havuzun süreçlerinin itibarenBir işlemi çok işlemli olarak nasıl iptal edebilirim. Zaman aşımından sonra ne yapmalıyım?

featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments 
for f in featureClass: 
    pool .apply_async(worker, args=f,callback=collectMyResult) 
pool.close() 
pool.join 

Ben, sonuç döndürmek için 60s fazla sürebilir olanları bekleyen önlemek istiyorum. Mümkün mü?

+0

İşçi nasıl görünüyor? Bunu bir "çok işlemciyle" yapmanın en kolay yolu "işçi" yi kesintiye uğratmaktır, ancak bu, ne yaptığına bağlı olarak mümkün olmayabilir. – dano

+0

worker, bir liste girişi ve bir liste çıktısı ile basit bir işlevdir – farhawa

+0

Aslında ne yapıyor? Listede yinelendiğini sanıyorum, ama her bir öğe üzerinde ne tür işlemler yapıyor? Her operasyon ne kadar sürer? – dano

cevap

10

worker işlevinizi değiştirmenize gerek kalmadan bunu yapmanın bir yolu. Buradaki fikir, çalışanı bir arka plan iş parçacığında worker arayacak ve ardından timeout saniye için bir sonuç bekleyecek olan başka bir işleve sarmaktır. Zaman aşımı süresi dolarsa, o aniden iplik worker sonlandırır bir istisna, yürütülüyor yükseltir:

import multiprocessing 
from multiprocessing.dummy import Pool as ThreadPool 
from functools import partial 

def worker(x, y, z): 
    pass # Do whatever here 

def collectMyResult(result): 
    print("Got result {}".format(result)) 

def abortable_worker(func, *args, **kwargs): 
    timeout = kwargs.get('timeout', None) 
    p = ThreadPool(1) 
    res = p.apply_async(func, args=args) 
    try: 
     out = res.get(timeout) # Wait timeout seconds for func to complete. 
     return out 
    except multiprocessing.TimeoutError: 
     print("Aborting due to timeout") 
     p.terminate() 
     raise 

if __name__ == "__main__": 
    pool = multiprocessing.Pool() 
    featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments 
    for f in featureClass: 
     abortable_func = partial(abortable_worker, worker, timeout=3) 
     pool.apply_async(abortable_func, args=f,callback=collectMyResult) 
    pool.close() 
    pool.join() 

multiprocessing.TimeoutError yükseltecektir zaman aşımı Herhangi fonksiyonu. Bunun, bir zaman aşımı gerçekleştiğinde geri aramanın yürütülmeyeceği anlamına geldiğini unutmayın. Bu kabul edilemezse, raise numaralı telefonu aramak yerinenumaralı telefon numarasını aramak için abortable_worker numaralı except bloğunu değiştirin.

+0

Tam olarak ihtiyacım olan şey bu! teşekkürler – farhawa

+0

Sadece bir soru daha, kodunuzdaki paralel işlemlerin sayısı nedir? – farhawa

+0

@wajdi Her zaman betiği çalıştıran makinedeki CPU sayısına eşit olan varsayılan işlem sayısını kullanır. Numarayı belirtmek isterseniz, onu 'çok işlemli' Yapıcı 'yapıcısına iletin: havuz = çok işlemcili.Olum (4) '. – dano

0

Çalışan çalışan zamanını ayarlamak için gevent.Timeout'u kullanabiliriz. gevent tutorial

from multiprocessing.dummy import Pool 
#you should install gevent. 
from gevent import Timeout 
from gevent import monkey 
monkey.patch_all() 
import time 

def worker(sleep_time): 
    try: 

     seconds = 5 # max time the worker may run 
     timeout = Timeout(seconds) 
     timeout.start() 
     time.sleep(sleep_time) 
     print "%s is a early bird"%sleep_time 
    except: 
     print "%s is late(time out)"%sleep_time 

pool = Pool(4) 

pool.map(worker, range(10)) 


output: 
0 is a early bird 
1 is a early bird 
2 is a early bird 
3 is a early bird 
4 is a early bird 
8 is late(time out) 
5 is late(time out) 
6 is late(time out) 
7 is late(time out) 
9 is late(time out) 
+1

Lütfen, bazı yazılı yorumlar verin. –

+1

Maymun eki neden gerekli? Bu, engellemeyi engelleyen şeyler yapmak için yapması gereken bir şey mi? – grisaitis

+0

Bu benim için çalışmadı. Maymun yama attı hataları – Kat

İlgili konular