2015-11-05 6 views
5

Aşağıdaki kodla yapmaya çalıştığım şey, bir liste listesini okumak ve bunları checker adlı işleve koymak ve checker işlevinin sonucuyla log_result anlaşması yapmaktır. Multithreading kullanarak bunu yapmaya çalışıyorum çünkü rows_to_parse değişken isminin gerçekte milyonlarca satırı vardır, bu nedenle birden fazla çekirdek kullanılması bu işlemi önemli miktarda hızlandırmalıdır.Pandalar veri karesine çok işlemcili yazma

Şu anda bu kod çalışmıyor ve Python çöküyor.

Kaygılar ve Sorunlar ben:

  1. satır güncellenmesi gerekiyor hangi aksi log_result karıştı alacak çünkü süreç boyunca dizin korumak için değişken df düzenlenen varolan df ister.
  2. apply_async'un bu görevi yerine getirmesi için uygun çoklu işlem işlevi olmadığından emin değilim çünkü bilgisayarın okuduğu ve yazdığını düşündüğüm sırasına inanıyorum, muhtemelen bozuk olabilir ???
  3. df 'u yazmak ve okumak için bir sıraya ihtiyaç duyulabilir, ancak bunu yapmak için nasıl gideceğime emin değilim.

Yardımlarınız için teşekkür ederiz. Çoklu işlem böyle

import pandas as pd 
import multiprocessing 
from functools import partial 

def checker(a,b,c,d,e): 
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] 
    index_of_match = match.index.tolist() 
    if len(index_of_match) == 1: #one match in df 
     return index_of_match 
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: 
     return [index_of_match[0]] 
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df 
     return [a,b,c,d,e] 



def log_result(result, dataf): 
    if len(result) == 1: # 
     dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df 
     new_row = pd.DataFrame([result],columns=cols) 
     dataf = dataf.append(new_row,ignore_index=True) 


def apply_async_with_callback(parsing_material, dfr): 
    pool = multiprocessing.Pool() 
    for var_a, var_b, var_c, var_d, var_e in parsing_material: 
     pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) 
    pool.close() 
    pool.join() 



if __name__ == '__main__': 
    #setting up main dataframe 
    cols = ['a','b','c','d','e'] 
    existing_data = [["YES","A","16052011","13031999",3], 
        ["NO","Q","11022003","15081999",3], 
        ["YES","A","22082010","03012001",9]] 

    #main dataframe 
    df = pd.DataFrame(existing_data,columns=cols) 

    #new data 
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], 
        ['YES', 'W', '17061992', '26032012', 6], 
        ['YES', 'G', '01122006', '07082014', 2], 
        ['YES', 'N', '06081992', '21052008', 9], 
        ['YES', 'Y', '18051995', '24011996', 6], 
        ['NO', 'Q', '11022003', '15081999', 3], 
        ['NO', 'O', '20112004', '28062008', 0], 
        ['YES', 'R', '10071994', '03091996', 8], 
        ['NO', 'C', '09091998', '22051992', 1], 
        ['YES', 'Q', '01051995', '02012000', 3], 
        ['YES', 'Q', '26022015', '26092007', 5], 
        ['NO', 'F', '15072002', '17062001', 8], 
        ['YES', 'I', '24092006', '03112003', 2], 
        ['YES', 'A', '22082010', '03012001', 9], 
        ['YES', 'I', '15072016', '30092005', 7], 
        ['YES', 'Y', '08111999', '02022006', 3], 
        ['NO', 'V', '04012016', '10061996', 1], 
        ['NO', 'I', '21012003', '11022001', 6], 
        ['NO', 'P', '06041992', '30111993', 6], 
        ['NO', 'W', '30081992', '02012016', 6]] 


    apply_async_with_callback(rows_to_parse, df) 
+0

Başka nedir: #no match, yapmakta olduğunuz df'ye yazmak için argümanlar vermek? Eğer [a, b, c, d, e] 'i döndürdüğünüzde, kodunuzun gerçekten tamamlanacağını fakat başka problemleriniz olacağını da, hiçbir zaman veriyi hiçbir yerde kullanmazsınız. –

+0

bunu işaretlediğiniz için teşekkür ederim, kodu değiştirdim. '' a, b, c, d, e] '' log_result' işlevindeki df'ye yazılır. – user3374113

+0

'kısmi (log_result, dataf = dfr)' 'log_results' imzasıyla eşleşmiyor – mdurant

cevap

8

Güncellenmesi DataFrames işe gitmiyor: Bu her öylesine O ekleme için) (n O (çok verimsiz

dataf = dataf.append(new_row,ignore_index=True) 

Bir kere (n^2) 'de Tercih edilen yol, bazı nesnelerin tek geçişte bir araya getirilmesidir.

Bir diğeri için ve daha da önemlisi, dataf her güncelleme için kilitlenmiyor, bu yüzden iki işlemin çakışmayacağı konusunda bir garanti yok (tahmin ediyorum) Bu python çöküyor)

Son olarak, ekleme yerinde değil, geri çağırma bittiğinde dataf değişkeni atılır! ve üst dataf için hiçbir değişiklik yapılmaz.


Biz MultiProcessing list veya dict kullanabilirsiniz. eğer sipariş verirseniz veya sipariş verirseniz (örneğin, numaralandır), listenin async'ten iyi tanımlanmış bir sırayla döndürülmediğini unutmayın.
(ya da biz kendimizi Lock uygulayan bir nesne oluşturmak Eli Bendersky görebiliyordu.)
Yani aşağıdaki değişiklikler yapılmıştır: zaman uyumsuz bittikten sonra

df = pd.DataFrame(existing_data,columns=cols) 
# becomes 
df = pd.DataFrame(existing_data,columns=cols) 
d = MultiProcessing.list([df]) 

dataf = dataf.append(new_row,ignore_index=True) 
# becomes 
d.append(new_row) 

Şimdi, sahip bir MultiProcessing.list arasında DataFrames.

pd.concat(d, ignore_index=True) 

hile yapmak mı: Bu (ve ignore_index) istenilen sonucu elde etmek için Concat edebilirsiniz.


Not: Her aşamada newRow DataFrame oluşturarak icar pandalar doğrudan tek seferde bir DataFrame için yerleşim listeleri ayrıştırmak olması da daha az verimlidir. Umarım bu bir oyuncak örneğidir, gerçekten çok sayıda parçanın MultiProcessing ile kazanmalarını oldukça istersiniz (50kb'yi bir kural olarak duydum ...), bir seferde hiçbir zaman bir satır olmayacak. burada kazan. Kenara


: (bir argüman denetleyicisi olarak, bu durumda) Kodunuzdaki (df gibi) globalsi kullanmaktan kaçınmalısınız, bu fonksiyonlarda hç çok temiz.

İlgili konular