2014-06-23 31 views
13

Birden çok işlem kullanarak paralel olarak bir işlev uygulamak istiyorum. Sorun, bir işlev çağrısı bir segmentasyon hatası tetiklerse, Havuzun sonsuza kadar askıda kalmasıdır. Böyle bir şey olduğunda ne olduğunu algılayan ve bir hata meydana getiren bir Havuz'u nasıl oluşturabileceğimi bilen var mı?çok işlemli. Çocuk segmentasyon hatasına neden oluyorsa kilitleniyor

Aşağıdaki örnek bunu yeniden nasıl gösterir (gerektirir scikit-öğrenme> 0.14) yerine belki daha ziyade el çocuğu yaratacak Pool().imap() kullanmanın

import numpy as np 
from sklearn.ensemble import gradient_boosting 
import time 

from multiprocessing import Pool 

class Bad(object): 
    tree_ = None 


def fit_one(i): 
    if i == 3: 
     # this will segfault              
     bad = np.array([[Bad()] * 2], dtype=np.object) 
     gradient_boosting.predict_stages(bad, 
             np.random.rand(20, 2).astype(np.float32), 
             1.0, np.random.rand(20, 2)) 
    else: 
     time.sleep(1) 
    return i 


pool = Pool(2) 
out = pool.imap_unordered(fit_one, range(10)) 
# we will never see 3 
for o in out: 
    print o 
+2

Segmentasyon hatası düzeltildi mi? Genellikle segfault'lar _undefined_ davranışı olan ve bir segfault'a neden olmak için guarenteed olmayan geçersiz bellek erişiminden kaynaklanır. –

+0

Cevap yok, ama ben bu işlib.Parallel sonsuza kadar asmak gibi görünüyor. Anlayabildiğim kadarıyla, segfault'u döndürmenin veya çok işlemcili bir "watchdog" zaman aşımı eklemenin bir yolu yoktur. –

+1

Aslında, bir zaman aşımı dekoratör ekleyebilirsiniz? Burada gösterildiği gibi: http://code.activestate.com/recipes/577028/ –

cevap

1

Process() kendinizi işler. Bahse girerim, geri dönen nesnenin herhangi bir çocuğun statüsünü almasına izin verir. Takıldıklarını bilirsiniz.

0

Hatanızı ele alıp alamayacağını görmek için örneğinizi çalıştırmadım, ancak eşzamanlı işlemleri deneyin. My_function (i) 'i fit_one (i) ile değiştirin. __name__=='__main__': yapısını koruyun. eşzamanlı geleceklerin buna ihtiyacı var gibi görünüyor. Aşağıdaki kod makinemde test ediliyor, bu yüzden umarım doğruca sizin üzerinizde çalışacaktır. yorumların açıklandığı gibi sen concurrent.Futures.ProcessPoolExecutor yerine multiprocessing.Pool kullanırsanız

import concurrent.futures 

def my_function(i): 
    print('function running') 
    return i 

def run(): 
    number_processes=4 
    executor = concurrent.futures.ProcessPoolExecutor(number_processes) 
    futures = [executor.submit(my_function,i) for i in range(10)] 
    concurrent.futures.wait(futures) 

    for f in futures: 
     print(f.result()) 

if __name__ == '__main__': 
    run() 
+0

python 3.2 gerektirir – Ninga

+0

Ben sadece işe yarayabilir düşünüyorum çünkü tüm süreçleri tamamladıktan sonra iade itilebilir 'futures' üzerinde her türlü yöntemleri arayabilirsiniz. Bu yüzden hatayı bir adım ileriye götürebilir. – Ninga

0

, bu sadece Python 3 çalışmaktadır. Eğer Python 2, kalıyorsanız

buldum iyi seçenek Pool.apply_async ve Pool.map_async tarafından döndürülen sonuç nesneler üzerinde timeout argüman kullanmaktır. Bu sürece bir alt süreç bir görevi tamamlamak için ne kadar süreceğini için bir üst sınır var gibi çalışır

pool = Pool(2) 
out = pool.map_async(fit_one, range(10)) 
for o in out: 
    print o.get(timeout=1000) # allow 1000 seconds max 

: Örneğin.

1

Bu bir known bug, issue #22393, in Python dur. Düzeltilene kadar multiprocessing.pool'u kullandığınız sürece anlamlı bir çözüm yoktur. Bu bağlantıda bir yama mevcuttur, ancak ana sürümüne henüz henüz entegre edilmemiştir, dolayısıyla Python'un kararlı sürümü sorunu çözmez.

İlgili konular