2016-11-21 30 views
9

40 GB veri üzerinde hesaplamalar yapıyorum. Her dosya, json satırlarını içeren sıkıştırılmış bir gzip dosyasıdır. Her dosyada en fazla 500.000 satır veya yaklaşık 500 MB bulunur. 128 cpu ve 1952 GB bellek ile çalışan bir Amazon örneğim var. Yapmaya çalıştığım şey, her dosyayı olabildiğince çabuk işlemek.Python Çoklu İşlem Havuzu Yeterli İşlemler Oluşturmuyor

def initializeLock(l): 

    global lock 
    lock = l 

if __name__ == '__main__': 
    directory = '/home/ubuntu/[directory_containing_files]/*.gz' 
    file_names = glob.glob(directory) 

    lock = Lock() 
    pool = Pool(initializer=initializeLock, initargs=(lock,)) 
    pool.map(do_analysis, file_names) 
    pool.close() 
    pool.join() 

Ne olmasını beklediğiniz süreçlerin büyük miktarda içindir oluşturulacak ve her biri bir dosya işler:

böyle çoklu işlem Havuzları kullanıyorum. Gerçekte olan şey başlangıçta 100'ün üzerinde süreç yaratılıyor. Bu noktada hafızamın yaklaşık% 85'ini kullanıyorum, bu harika! Sonra her biri tamamlandı. Sonunda çalışan işlemlerin miktarı yaklaşık 10'a düşüyor. Bu noktada, sadece belleğin% 5'ini kullanıyorum. Periyodik olarak ek işlemler başlatılır, ancak hiçbir zaman 100 ya da daha fazla çalışmaya devam etmez. Bu kadar büyük belleğe sahip bu boş hafızaya sahibim, ama çoğu zaman en fazla 10 işlemde çalışıyorum.

Tüm dosyalar tamamlanana kadar 100 işlemi çalıştırmaya devam etmek için nasıl bir fikir edineceğinize dair bir fikir var mı?

DÜZENLEME:

ben uygulamaya bazı günlük ekledi. Başlangıçta 127 işlem yükler, sanırım bunun 128 CPU'su var ve süreçler yüklendiğinde kullanımdaydı. Bazı işlemler başarıyla tamamlanır ve sonuç kaydedilir. Daha sonra bir noktada tüm çalışan işlemlerin birkaçı bitiyor. Kaç dosya bittiğini kontrol ettiğimde 127'den sadece 22'si tamamlandı. Sonra sadece 5-10 işlemlerini kullanarak çalışır ve bunların hepsi başarıyla bitirir. Düşünüyorum belki hafızası tükenmiş ve çöküyor. Ama neden? Çok fazla belleğim ve çok fazla CPU'm var.

DÜZENLEME 2:

Yani sorunu buldum. Sorun, do_analysis yönteminde bir kilit oluşturuyordu ve tüm süreçler aynı zamanda bitiyordu ve kilidin serbest kalmasını bekliyordu. Süreçler durmadı, uyuyorlardı. Yani bu beni başka bir soruya getiriyor: Burada ana amacım birçok json satırı olan her bir dosyayı almak, json satırındaki ID özelliğini almak ve daha sonra aynı kimliğe sahip diğer satırları içeren bir dosyaya eklemektir. Eğer dosya mevcut değilse onu yaratıyorum. Yaptığım dosyaya, başka bir işlem tarafından erişilmesini önlemek için erişildiği zaman bir kilit oluşturuldu. İşte benim kodum.

for key, value in dataframe.iteritems(): 
    if os.path.isfile(file_name): 
     lock.acquire() 
     value.to_csv(filename), mode='a', header=False, encoding='utf-8') 
     lock.release() 
    else: 
     value.to_csv(filename), header=True, encoding='utf-8') 

yüzden şimdi dosyalara eklenecek yaratıcı bir şekilde düşünmeye çalışıyorum, ama diğer her sürecini engelleme değilim. Çok fazla veri ile uğraşıyorum ve iki dosyaya aynı anda erişilmesi gereken şans düşük, ama yine de olacak. Bu yüzden, bir dosya eklendiğinde, başka bir işlemin bu dosyayı açmaya çalışmadığından emin olmam gerekir.

+1

Havuzun argümanlarına işlem sayısını eklemeyi deneyin (initializer = initializeLock, process = 100, initargs = (kilit,)) –

+1

, "pool.map" yerine "pool.imap_unordered" işlevini kullanmayı düşündünüz mü? –

+0

@SeregaLuchko Bunu denedim. Aynı şey olsa da oldu. – Gabriel

cevap

1

Girişiniz için herkese teşekkürler. İşte benim için şu anki çözümüm, önümüzdeki haftada daha verimli hale getirmeyi planlıyorum. Ben Martin'in tavsiyesini aldım ve hepsini bir kez yaptıktan sonra dosyaları yapıştırıyorum, ancak daha fazla dosya üretirken, bir kuyruğa yapıştırma yapmak için bir süreç işine sahip olan daphtdazz çözümünü uygulamak için çalışmak istiyorum.

def do_analyis(file): 
    # To keep the file names unique, I append the process id to the end 
    process_id = multiprocessing.current_process().pid 

    # doing analysis work... 

    for key, value in dataframe.iteritems(): 
     if os.path.isfile(filename): 
      value.to_csv(filename), mode='a', header=False, encoding='utf-8') 
     else: 
      value.to_csv(filename), header=True, encoding='utf-8') 

def merge_files(base_file_name): 
    write_directory = 'write_directory' 
    all_files = glob.glob('{0}*'.format(base_file_name)) 

    is_file_created = False 

    for file in all_files: 
     if is_file_created: 
      print 'File already exists, appending' 
      dataframe = pandas.read_csv(file, index_col=0) 
      dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8') 
     else: 
      print 'File does not exist, creating.' 
      dataframe = pandas.read_csv(file, index_col=0) 
      dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8') 
      is_file_created = True 


if __name__ == '__main__': 
    # Run the code to do analysis and group files by the id in the json lines 
    directory = 'directory' 
    file_names = glob.glob(directory) 
    pool = Pool() 
    pool.imap_unordered(do_analysis, file_names, 1) 
    pool.close() 
    pool.join() 

    # Merge all of the files together 
    base_list = get_unique_base_file_names('file_directory') 
    pool = Pool() 
    pool.imap_unordered(merge_files, base_list, 100) 
    pool.close() 
    pool.join() 

Bu, dosyanın sonuna eklenen benzersiz bir işlem kimliği ile her dosyayı kaydeder sonra geri gider ve json dosyasında kimliğe göre tüm dosyaları alır ve bunları birbirine birleştirmek. Dosyaları oluştururken, cpu kullanımı% 60-70 arasındadır. Bu iyi. Dosyaları birleştirirken, cpu kullanımı yaklaşık% 8'dir. Bunun nedeni, dosyaların çok hızlı bir şekilde birleştirilmesidir çünkü sahip olduğum tüm işlemci işlem gücüne ihtiyacım yoktur. Bu çözüm işe yarıyor.Fakat daha verimli olabilirdi. İkisini de aynı anda yapmak için çalışacağım. Herhangi bir öneri memnuniyetle karşılanmaktadır.

İlgili konular