2012-11-19 32 views
32

Çok sayıda alt problem içeren büyük bir sayısal problemi çözmeye çalışıyorum ve farklı çekirdekler üzerine farklı bağımsız alt problemleri ayırmak için Python'un çoklu işlem modülünü (özellikle Pool.map) kullanıyorum. Her alt problem bir çok alt-alt-problemin hesaplanmasını içerir ve bu sonuçları henüz herhangi bir işlemle hesaplanmamışlarsa bir dosyaya saklayarak etkin bir şekilde belleğe almaya çalışıyorum, aksi halde hesaplamayı atlayın ve sadece sonuçları dosyadan okuyun.Python çoklu işleme güvenli bir şekilde bir dosyaya yazıyor

Dosyaları ile eşzamanlılık sorunları yaşıyorum: farklı işlemler bazen bir alt-altproblemin hesaplanmış olup olmadığını (sonuçların saklanacağı dosyaya bakarak) kontrol edip, olmadığını görüyorlar. hesaplamayı çalıştırın, ardından sonuçları aynı dosyaya aynı anda yazmaya çalışın. Böyle çarpışmalar yazmayı nasıl önleyebilirim?

+3

ara dışında kullanılan belgeleri bir örnek [ 'multiprocessing.Lock'] (http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes), birden çok eşitlemek süreçler. –

+11

Diğer çalışan işlemleriyle beslenebilecek bir girdi olarak Kuyruk ile tek bir işlem yazma sonucuna sahip olabilirsiniz. Tüm işçi süreçlerinin salt okunur olması güvenli olacağına inanıyorum. – GP89

+0

İşlerimi daha karmaşık hale getirmek için, aynı ağa bağlı dosya sisteminde alt-alt-sorunlara her bir sonuç yazarken, aynı anda bir kümede birden çok farklı büyük ana problemi çalıştırdığımı belirtmeliydim. Böylece birbirinden ayrı makinelerde çalışan süreçlerden kaynaklanan çarpışmaları tamamen alabiliyorum (bu yüzden çoklu işlem gibi çözümler kullanmıyorum. Kilit işe yarayacaktır). –

cevap

63

@ GP89 iyi bir çözümden bahsetmiştir. Yazma görevlerini, dosyaya tek yazma erişimi olan özel bir işleme göndermek için bir sıra kullanın. Diğer tüm çalışanlar sadece erişimi okudu. Bu çarpışmaları ortadan kaldıracaktır. İşte apply_async kullanan bir örnektir, ama çok harita ile çalışacaktır:

import multiprocessing as mp 
import time 

fn = 'c:/temp/temp.txt' 

def worker(arg, q): 
    '''stupidly simulates long running process''' 
    start = time.clock() 
    s = 'this is a test' 
    txt = s 
    for i in xrange(200000): 
     txt += s 
    done = time.clock() - start 
    with open(fn, 'rb') as f: 
     size = len(f.read()) 
    res = 'Process' + str(arg), str(size), done 
    q.put(res) 
    return res 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 

    f = open(fn, 'wb') 
    while 1: 
     m = q.get() 
     if m == 'kill': 
      f.write('killed') 
      break 
     f.write(str(m) + '\n') 
     f.flush() 
    f.close() 

def main(): 
    #must use Manager queue here, or will not work 
    manager = mp.Manager() 
    q = manager.Queue()  
    pool = mp.Pool(mp.cpu_count() + 2) 

    #put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    #fire off workers 
    jobs = [] 
    for i in range(80): 
     job = pool.apply_async(worker, (i, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 

if __name__ == "__main__": 
    main() 

iyi şanslar,

Mike

Bu geçici sonuçlarını kaydetmek için Manager'ı kullanmak gerektiğini bana bakıyor
+1

Hey Mike, cevap için teşekkürler. Sanırım bu sorularımı ifade ettiğim gibi işe yarayacaktı, ama soruna verilen yorumlarda ana hatlarıyla belirtildiği gibi tam bir sorunu çözeceğinden emin değilim, özellikle ağ üzerinde birkaç makinede çalışan birkaç ana programım var. dosya sistemi, hepsi aynı dosyaya yazmaya çalışacak süreçlere sahip olabilir. (FWIW, bir süre önce şahsi sorunumun etrafından habersiz bir şekilde geçtim ama başkalarının da benzer sorunları olması durumunda yorum yapıyorum.) –

+1

Gerçekten bu kadar çok kez tekrar etmek istiyorum. Bu benim için pek çok kez yardımcı oldu. Bugün bir kez daha. – Eduardo

+0

Teşekkürler Mike - MP Queues'i nasıl kullanacağım konusunda mücadele ediyordum. Örneğiniz onu çok net ve anlaşılır kılıyor. – Anurag

0

bir listeye ve sonuçları listeden bir dosyaya yazın. Ayrıca, işlemek istediğiniz nesneyi ve yönetilen listeyi geçmek için starmap kullanın. İlk adım, yönetilen listeyi içeren starmap'e geçirilecek parametrenin oluşturulmasıdır.

from multiprocessing import Manager 
from multiprocessing import Pool 
import pandas as pd``` 

def worker(row, param): 
    # do something here and then append it to row 
    x = param**2 
    row.append(x) 

if __name__ == '__main__': 
    pool_parameter = [] # list of objects to process 
    with Manager() as mgr: 
     row = mgr.list([]) 

     # build list of parameters to send to starmap 
     for param in pool_parameter: 
      params.append([row,param]) 

     with Pool() as p: 
      p.starmap(worker, params) 

Bu noktadan, listeyi nasıl ele alacağınıza karar vermeniz gerekir. Eğer tonlarca RAM'iniz varsa ve büyük bir veri seti pandaları kullanarak birleştirme konusunda çekinmeyin. Ardından, dosyayı csv veya turşu olarak kolayca kaydedebilirsiniz.

 df = pd.concat(row, ignore_index=True) 

     df.to_pickle('data.pickle') 
     df.to_csv('data.csv') 
İlgili konular