2015-11-15 7 views
15

Aynı simülasyonu farklı parametrelerle bir döngüde çalıştırıyorum. Her simülasyon, sadece okunan, asla değiştirilmeyen bir panda DataFrame (data) kullanır. ipyparallel (ipython paralel) kullanarak, simülasyonlar önce Bence her motorun genel değişken uzaya bu DataFrames koyabilirsiniz başlatın:Ipyparallel istemci ve uzak motorlar arasında en iyi statik verileri nasıl paylaşılır?

view['data'] = data 

motorları daha sonra çalıştırmak olsun tüm simülasyonlar için DataFrame erişebilir onlar. Verilerin kopyalanması işlemi (eğer turşu yapılmışsa, data 40MB'dir) sadece birkaç saniyedir. Ancak, simülasyonların sayısı arttığında bellek kullanımının çok büyük olduğu görülmektedir. Bu paylaşılan veri her motor için yerine her görev için kopyalanıyor hayal ediyorum. Motorlu bir istemciden statik salt okunur verileri paylaşmanın en iyi yolu nedir? Motor başına bir kez kopyalamak kabul edilebilir, ancak ideal olarak sadece bir kez bir kez kopyalanmalıdır (host1'de 4 motor ve host2'de 8 motor var). Simülasyon sayıları (~ 50), o zaman başlamak için bir süre alır küçüktür ama ilerleme baskı ifadeleri görmeye başlarsanız

from ipyparallel import Client 
import pandas as pd 

rc = Client() 
view = rc[:] # use all engines 
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task 

def do_simulation(tweaks): 
    """ Run simulation with specified tweaks """ 
    # Do sim stuff using the global data DataFrame 
    return results, id, tweaks 

if __name__ == '__main__': 
    data = pd.read_sql("SELECT * FROM my_table", engine) 
    threads = [] # store list of tweaks dicts 
    for i in range(4): 
     for j in range(5): 
      for k in range(6): 
       threads.append(dict(i=i, j=j, k=k) 

    # Set up globals for each engine. This is the read-only DataFrame 
    view['data'] = data 
    ar = view.map_async(do_simulation, threads) 

    # Our async results should pop up over time. Let's measure our progress: 
    for idx, (results, id, tweaks) in enumerate(ar): 
     print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress/len(ar), idx, id) 
     # Store results as a pickle for the future 
     pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j']) 
     # Save our results to a pickle file 
     pd.to_pickle(results, out_file_path + pfile) 

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time) 

:

İşte benim kod. Tuhaf bir şekilde, aynı motora birden fazla görev atanacak ve bu motor için atanan tüm görevlerin tamamlanmasına kadar bir yanıt göremiyorum. Tek bir simülasyon görevi her tamamlandığında enumerate(ar)'dan bir yanıt görmeyi beklerdim.

Simulasyon sayımları büyükse (~ 1000), başlamak uzun zaman alıyor, CPU'ların tüm motorlarda gaz attığını görüyorum, ancak uzun bir zamana kadar (~ 40mins) ilerleyen baskı ifadeleri görülmüyor ve 'un ilerleme durumunu gördüğümde, görevlerin büyük bir blok (> 100) aynı motora gittiğini ve bazı ilerlemeden önce bu motorun tamamlanmasını beklediğini görün. Bir motor tamamlandığında, 4 saniye boyunca yeni yanıtlar sağlayan ar nesnesini gördüm - bu, çıktı pikap dosyalarını yazmak için zaman gecikmesi olabilir.

Son olarak, host1 ipycontroller görevini de çalıştırır ve bellek kullanımı çılgın gibi gider (bir Python görevi,> 6 GB RAM, 3GB kullanarak bir çekirdek görevinin kullanıldığını gösterir). Host2 motoru gerçekten çok fazla bellek kullanımı göstermiyor. Bu başak hafızaya ne sebep olur?

+0

Hiç bir çözüm buldunuz mu? Bu şu anda karşı karşıya olduğumuz bir sorundur. – DrSAR

cevap

6

Bu mantığı birkaç yıl önce bir kodda kullandım ve this'u kullanıyorum. O zaman başlamak için bir süre alır simülasyon sayımları küçükse

shared_dict = { 
    # big dict with ~10k keys, each with a list of dicts 
} 

balancer = engines.load_balanced_view() 

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd 
    import ujson as json 

engines[:].push(shared_dict) 

results = balancer.map(lambda i: (i, my_func(i)), id) 
results_data = results.get() 

(~ 50), ama ilerleme baskı ifadeleri görmeye başlarsınız: My kod şey gibiydi. Garip bir şekilde, birden fazla görev aynı motora atanacak ve bu motoru için bu atanmış görevlerin tamamlanmasına kadar bir yanıt göremiyorum. Tek bir simülasyon görevi tamamlandığında, her zaman numarasından bir cevap görmeyi beklerim. Benim durumumda

, my_func() bir dosyaya yazılı günlüğü çok sayıda mesaj koymak karmaşık bir yöntem olduğunu, bu yüzden benim baskı ifadeleri vardı.

Görev ataması hakkında, load_balanced_view() kullandığı gibi, kütüphaneye gittim ve yolunu buldum.Simülasyon sayıları (~ 1000) büyükse

, ~ (i CPU'lar tüm motorlarında basıyorsunuz bkz başlamak için uzun bir zaman alır, ancak hiçbir ilerleme baskı ifadeleri uzun zamandır kadar görülür 40mins) ve 'a baktığımda ilerlemeyi görüyorum, görevin büyük bir bloğu (> 100) aynı motora gitti ve bazı ilerlemeyi sağlamadan önce bir motordan tamamlanmasını bekledi. Bir motor tamamlandığında, ar nesnesini 4 saniye boyunca yeni yanıtlar sağladığını gördüm - bu çıkış pikap dosyalarını yazmak için zaman gecikmesi olabilir.

Uzun zamandır bunu yaşamamıştım, o yüzden hiçbir şey söyleyemem.

Umarım bu sizin probleminize ışık tutabilir.


PS: Ben yorumunda söylediği gibi, sen multiprocessing.Pool deneyebilirsin. Sanırım büyük ve salt okunur bir veriyi global değişken olarak kullanarak paylaşmaya çalışmadım. Denemek istiyorum, çünkü it seems to work.

+1

bu, orta verilerle iyi ölçeklenmiyor, korkarım. Örneğin, verileriniz 5 GB ise ve 8 çekirdekli yerel bir makinede 8 motor kullanıyorsanız, 40 GB veriyi kopyalayabilirsiniz. –

+0

Hmm, şimdi bunu bilmek güzel. Çok işlemcili kitaplıktan Havuzu denediniz mi? Çok-işlemli kullanarak salt okunur verileri global değişken olarak paylaşmaya çalışmadım. Denemek istiyorum çünkü işe yaramış gibi görünüyor (ref: https://kaushikghose.wordpress.com/tag/python/). – paulochf

+1

İçgörü için teşekkürler. Bu soruyu sorduğumdan bu yana [iPyParallel evrimleşmiştir] (https://ipyparallel.readthedocs.io/en/latest/details.html) ve şimdi ortak bir set üzerinde çalışan işçileri ile çoklu istemci makinelerini yönetmek için PyZMQ kullanıyor görevler. Bu, çalışmayı daha kolay bulduğum Redis Queuing'i andırıyor ve iPyParallel – hamx0r

İlgili konular