2009-11-04 15 views
38

Birden çok işlem tarafından paralel olarak işlenmesini istediğim çok büyük (salt okunur) bir veri dizim var.Python çoklu işlemede Pool.map ile Array (shared memory) nasıl birleştirilir?

Pool.map işlevini seviyorum ve bu verilerdeki işlevlerin paralel olarak hesaplanmasını kullanmak istiyorum.

İşlemler arasında paylaşılan bellek verilerini kullanmak için Değer veya Dizi sınıfını kullanabileceğini gördüm. Bunu kullanmaya çalıştığınızda Ama RuntimeError olsun: 'Pool.map işlevini kullanırken SynchronizedString nesneleri sadece miras yoluyla işlemleri arasında paylaşılması gereken: Burada

Yapmak ne çalışıyorum basitleştirilmiş bir örnektir:

from sys import stdin 
from multiprocessing import Pool, Array 

def count_it(arr, key): 
    count = 0 
    for c in arr: 
    if c == key: 
     count += 1 
    return count 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    # want to share it using shared memory 
    toShare = Array('c', testData) 

    # this works 
    print count_it(toShare, "a") 

    pool = Pool() 

    # RuntimeError here 
    print pool.map(count_it, [(toShare,key) for key in ["a", "b", "s", "d"]]) 

Burada yanlış yaptığım kişi bana söyleyebilir mi?

Ne yapmak isterim, işlem havuzunda oluşturulduktan sonra yeni oluşturulmuş bir paylaşılan bellek ayırma dizisi hakkında işlemlere bilgi aktarmaktır.

+1

Maalesef bu mümkün değil. MP belgelerine göre önerilen yol, miras (çatallı platformlarda) kullanmaktır. Burada sadece okunan verileri okumak için normalde bir global kullanılacaktır, ancak okuma/yazma iletişimi için paylaşılan bir Dizi kullanabilir. Forking ucuzdur, böylece verileri aldığınız zaman Pool'u yeniden yaratabilirsiniz, daha sonra kapatın. Ne yazık ki, Windows'da bu mümkün değil - geçici bir paylaşılan bellek dizisi kullanmaktır (salt okunur durumda bile) ancak bu yalnızca süreç oluştururken alt işlemlere geçirilebilir (erişim listesine eklenmeleri gerektiğini hayal ediyorum) ... paylaşılan bellek segmenti için – robince

+0

ve bu mantığın alt işlem başlangıcı dışında uygulanmadığı). Paylaşılan veri dizisini, gösterdiğim şekilde Havuz başlangıcında veya bir İşlemi benzer şekilde iletebilirsiniz. Paylaşılan bir bellek dizisini açık bir havuza aktaramazsınız - bellekten sonra havuz oluşturmak zorundasınız. Bunun etrafındaki kolay yollar, maksimum boyutta bir arabellek ayırmak veya yalnızca Havuz başlamadan önce gereken boyutu bildiğinizde diziyi ayırmaktır. Eğer global değişkenlerinizin altında kalıyorsanız, havuzlar Windows'ta çok pahalı olmamalı - global değişkenler otomatik olarak ... – robince

+0

alınıp işlenen alt süreçlere gönderildi - bu yüzden başlangıçta yeterli büyüklükte bir arabellek hazırladım. Umarım küresel değişkenlerinizin miktarı küçüktür), daha sonra Pool, daha iyidir. Sorununuzu iyi niyetle anlamanız ve çözmeniz için zaman ayırdım - sorunuzu düzenlemeden önce - çalışmamıza izin vermek isteyip istemediğinizi anladığım halde, umarım sonunda farklı/daha iyi bir şey olmazsa cevabımı kabul etmeyi düşünebilirsiniz. boyunca. – robince

cevap

35

tekrar deneniyor: listede anahtarı gibi geçirilir argümanlar olarak aktarıldı (dekapaj ile). Verilerin serileştirilmesi mantıklı değildir - veri, paylaşılan hafızadır. Yani paylaşılan diziyi global hale getirmelisin. Bence ilk cevabımda olduğu gibi, bir modülün özniteliği olarak onu koymak daha da güzel, ancak örneğinizde küresel bir değişken olarak bırakmanız da iyi çalışıyor. Verileri çataldan önce ayarlamak istemediğinizi belirterek, burada değiştirilmiş bir örnek var. Mümkün olan birden fazla paylaşımlı diziye sahip olmak istiyorsanız (ve bu nedenle de toShare'i bir argüman olarak iletmek istediğinizde), benzer şekilde genel bir dizi paylaşılan dizinin listesini oluşturabilir ve endeksi count_it'e (for c in toShare[i]: olur) geçirebilirsiniz.

from sys import stdin 
from multiprocessing import Pool, Array, Process 

def count_it(key): 
    count = 0 
    for c in toShare: 
    if c == key: 
     count += 1 
    return count 

if __name__ == '__main__': 
    # allocate shared array - want lock=False in this case since we 
    # aren't writing to it and want to allow multiple processes to access 
    # at the same time - I think with lock=True there would be little or 
    # no speedup 
    maxLength = 50 
    toShare = Array('c', maxLength, lock=False) 

    # fork 
    pool = Pool() 

    # can set data after fork 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    if len(testData) > maxLength: 
     raise ValueError, "Shared array too small to hold data" 
    toShare[:len(testData)] = testData 

    print pool.map(count_it, ["a", "b", "s", "d"]) 

[DÜZENLEME: Yukarıdaki çünkü çatal kullanmayan pencerelerine çalışmaz. Ancak, aşağıda yine Pool kullanarak Windows üzerinde çalışmak, bu yüzden bu istediğini yakın olduğunu düşünüyor ki: harita diziyi ama Süreci turşu ve alışkanlık neden

from sys import stdin 
from multiprocessing import Pool, Array, Process 
import mymodule 

def count_it(key): 
    count = 0 
    for c in mymodule.toShare: 
    if c == key: 
     count += 1 
    return count 

def initProcess(share): 
    mymodule.toShare = share 

if __name__ == '__main__': 
    # allocate shared array - want lock=False in this case since we 
    # aren't writing to it and want to allow multiple processes to access 
    # at the same time - I think with lock=True there would be little or 
    # no speedup 
    maxLength = 50 
    toShare = Array('c', maxLength, lock=False) 

    # fork 
    pool = Pool(initializer=initProcess,initargs=(toShare,)) 

    # can set data after fork 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    if len(testData) > maxLength: 
     raise ValueError, "Shared array too small to hold data" 
    toShare[:len(testData)] = testData 

    print pool.map(count_it, ["a", "b", "s", "d"]) 

emin değil Havuz olacak - Bence belki de pencerelerdeki alt işlem başlatması noktasında aktarılır. Verilerin hala çataldan sonra ayarlandığını unutmayın.

+0

Çatallı platformlarda bile, yeni paylaşılan verileri fork'den sonra toShare'e ekleyemezsiniz çünkü her işlem bu noktada kendi bağımsız kopyasına sahip olacaktır. –

+0

Yani asıl sorun, bir Array hakkındaki bilgiyi nasıl seçebileceğimiz ve böylece diğer süreçten gönderilip bağlanabileceğimiz gibi görünüyor. –

+0

@James - hayır bu doğru değil. Dizi, çataldan önce ayarlanmalıdır, ancak daha sonra tüm çocuklar arasında görünür olan değişikliklerle değiştirilebilen bellek paylaşılır. Örneğe bakalım - veriyi diziye koydum * çataldan sonra (havuz() başlatıldığında ortaya çıkan). Bu veriler, çataldan sonra çalışma zamanında elde edilebilir ve önceden ayrılmış paylaşımlı bellek segmentine uyduğu sürece, orada kopyalanabilir ve tüm çocuklardan görülebilir. – robince

2

Veriler okunduğunda, modülünde yalnızca Havuzundan gelen bir çatalda bir değişken yapın. Daha sonra tüm çocuk süreçleri ona erişebilmeli ve yazmamak şartıyla kopyalanmayacaktır.

import myglobals # anything (empty .py file) 
myglobals.data = [] 

def count_it(key): 
    count = 0 
    for c in myglobals.data: 
     if c == key: 
      count += 1 
    return count 

if __name__ == '__main__': 
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 

pool = Pool() 
print pool.map(count_it, ["a", "b", "s", "d"]) 

Eğer lock=False kelime argümanı ile deneyebilirsiniz rağmen Array kullanmak denemek istiyorum yoksa

(varsayılan olarak geçerlidir).

+0

Ben, globals kullanımının güvenli olduğuna inanmıyorum ve kesinlikle süreçlerin çatallanmadığı pencerelerde çalışmaz. –

+0

Nasıl güvenli değil? Verilere sadece okuma erişimine ihtiyacınız varsa, sorun yoktur. Yanlışlıkla yazıyorsanız, değiştirilen sayfa çocuk işlemi için kopyalanacaktır, böylece kötü bir şey olmayacaktır (örneğin diğer süreçlere müdahale etmeyecektir). Haklısın, pencerelerde işe yaramıyor ... – robince

+0

Çatal tabanlı platformlarda güvenli olduğu doğru. Ancak, işlem havuzu oluşturulduktan sonra büyük miktarda veri paylaşmak için paylaşımlı bellek tabanlı bir yol olup olmadığını bilmek istiyorum. –

4

Gördüğüm sorun, Pool'un argüman listesi aracılığıyla paylaşılan veriyi desteklememesidir. Hata mesajı "nesneleri yalnızca devralma yoluyla işlemler arasında paylaşılmalıdır" anlamına gelir. Paylaşılan verilerin, Pool sınıfını kullanarak paylaşmak istiyorsanız global, yani global olması gerekir.

Bunları açıkça iletmeniz gerekiyorsa, çok işlemciyi kullanmak zorunda kalabilirsiniz. değişebilir sıranın elemanlarının

from multiprocessing import Process, Array, Queue 

def count_it(q, arr, key): 
    count = 0 
    for c in arr: 
    if c == key: 
     count += 1 
    q.put((key, count)) 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    # want to share it using shared memory 
    toShare = Array('c', testData) 

    q = Queue() 
    keys = ['a', 'b', 's', 'd'] 
    workers = [Process(target=count_it, args = (q, toShare, key)) 
    for key in keys] 

    for p in workers: 
    p.start() 
    for p in workers: 
    p.join() 
    while not q.empty(): 
    print q.get(), 

Output: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

sipariş: İşte yeniden işlenmiş örnektir.

Bunu daha genel ve Havuza benzer hale getirmek için, sabit bir N sayıda İşlem oluşturabilir, anahtarların listesini N parçalarına bölebilir ve ardından Süreç hedefi olarak bir sarmalayıcı işlevi kullanabilirsiniz. çoklu işlem paylaşılan bellek Diziler olamaz -

Temelde ben hata mesajı o söylediklerini kastediyor), ben sadece ödül gördüğümüz gibi

def wrapper(q, arr, keys): 
    for k in keys: 
    count_it(q, arr, k) 
-1

The multiprocessing.sharedctypes module provides functions for allocating ctypes objects from shared memory which can be inherited by child processes.

sharedctypes kullanımınız yanlış. bu diziyi ana işlemden mi yoksa yoksa açıkça iletmeyi mi tercih edersiniz? Eski durumda, diğer cevapların önerdiği gibi küresel bir değişken oluşturmanız gerekir. Ama sharedctypes'u açıkça geçmek için kullanmanıza gerek yok, sadece orijinal testData'u geçirin.

BTW, Pool.map() kullanımınız yanlış. Bu map() builtin ile aynı arabirime sahiptir (starmap() ile karıştırdınız?). Aşağıda, dizinin açıkça iletilmesiyle ilgili örnek verilmiştir:

from multiprocessing import Pool 

def count_it((arr, key)): 
    count = 0 
    for c in arr: 
     if c == key: 
      count += 1 
    return count 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    pool = Pool() 
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]]) 
+0

Onun istediği bu değildi çünkü teoride testData çok büyük olacak - ve bu yöntem onu ​​turşu (ekstra bellek gerektiriyor) ile sonuçlandırarak ve her bir işleme (en az n x orijinal depolama gerektiren) kopyalamaya neden oluyor. – robince

+0

@thrope: haklısın, bu yüzden her iki yoldan da bahsetmiştim. Küresel değişkeni kullanmanın örneği açık olmalı, dolayısıyla listeye gerek yok. –

+1

@Denis - yep, ama ne yazık ki, global yöntem Windows üzerinde çalışmaz - çatal ve unix kopyalama-yazma üzerine dayanır. Pencerelerde global yöntemi kullanırsa, çok işlemli veriyi alıp her alt süreçe gönderir - daha fazla bellek gerektirir. – robince

İlgili konular