2013-02-27 14 views
8

Çok işlemcili paket (Ubuntu 12.04 on Amazon EC2 üzerinde 1.7.0 ile python 2.73) ile basit bir sayısal temelli matris cebri hesaplamaları yaparken bir sistem hatası alıyorum (aşağıda gösterilmektedir) . Kodum daha küçük matris boyutları için iyi çalışıyor ancak daha büyük olanlar için çöküyor (bol miktarda kullanılabilir bellek)Çoklu işlem kullanarak alt işlemlerin çalıştırılması sırasında sistem hatası

Kullandığım matrislerin boyutu önemli (1000000x10 float yoğun matrisler için kodum düzgün çalışıyor ancak 1000000x500 olanlar için çöküyor - I Bu matrisleri alt süreçlere/yollardan geçiriyorum). 10 vs 500 bir çalışma zamanı parametresidir, her şey aynı kalır (giriş verileri, diğer çalışma zamanı parametreleri vb.)

Ayrıca, daha büyük matrisler için python3 kullanarak aynı (taşınmış) kodu çalıştırmayı denedim alt süreçler uyku/boşta moduna geçerler (python 2.7'deki gibi çökmek yerine) ve program/alt işlemlerin hiçbir şey yapmadan orada dururlar. Daha küçük matrisler için kod python3 ile iyi çalışır.

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

Multiprocessing kodu kullandığım:

herhangi bir öneriniz çok

Hata mesajı (Burada fikir gelmiyordu am) mutluluk duyacağız

Aşağıda
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

olduğunu " proc "her alt işlem için çalıştırılır. Temel olarak, numpy kullanarak birçok doğrusal denklem sistemini çözer (alt süreç içinde gerekli matrisleri oluşturur) ve sonuçları başka bir matris olarak döndürür. Bir kez daha, bir çalışma zamanı parametresinin daha küçük değerleri için iyi çalışır, ancak daha büyük olanlar için çöker (veya python3'te asılı kalır).

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

Proc işlevinin kodunu paylaşabilir misiniz? – barracel

+0

Sadece yaptım. Proc'un argümanlarını açıklamamıştım - bunlardan bazıları matrisler, bazıları liste listesi ve bazıları sadece yüzer/tamsayılar. Her bir alt işlemden sonuçları döndürmek için sıra kullanılır. – Yevgeny

cevap

5

500.000.000 çok büyük: float64 kullanıyorsanız, 4 milyar bayt veya 4 GB. (10.000.000 float dizisi 80 milyon bayt, ya da 80 MB daha küçük olurdu.) Sorunun, bir işlemin bir boru üzerinde alt işlemlere gönderilmesi için dizileri toplamaya çalışan çok işlemciyle bir ilgisi olmasını bekliyorum.

Bir unix platformunda olduğunuzdan, bu davranışı fork() (çok işlemcinin işçileri oluşturmak için kullanılır) bellek devralma davranışını kullanarak önleyebilirsiniz. Yorumlar tarafından tanımlanan bu hack (this project sökük) ile büyük bir başarı elde ettim.

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
İlgili konular