2017-09-01 54 views
14

Sorun açıklaması
Kodu this answer'dan biraz aşağı ayarlıyorum (aşağıya bakın). Ancak bu komut dosyasını Linux üzerinde çalıştırırken (yani komut satırı: python script_name.py) tüm işler için jobs running: x yazdıracak, ancak bundan sonra sıkışmış gibi görünecektir. Ancak, spawn yöntemini (mp.set_start_method('spawn')) kullandığımda, düzgün çalışıyor ve hemen counter değişkeninin değerini yazdırmaya başlıyor (bkz. listener yöntemi).Linux/Intel Xeon'da Python 3.6.1 ile "fork" bağlam blokları ile çoklu işlem yapılabilir mi?

Soru süreçlerini yumurtlama ne zaman

  • Neden sadece çalışır? (Muhtemelen daha hızlı olduğu için) o Forc ile

Kod

import io 
import csv 
import multiprocessing as mp 

NEWLINE = '\n' 

def file_searcher(file_path): 
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t') 

    manager = mp.Manager() 
    q = manager.Queue() 
    pool = mp.Pool(mp.cpu_count()) 

    # put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    jobs = [] 
    for row in parsed_file: 
     print('jobs running: ' + str(len(jobs) + 1)) 
     job = pool.apply_async(worker, (row, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 
    pool.join() 

def worker(genome_row, q): 
    complete_data = [] 
    #data processing 
    #ftp connection to retrieve data 
    #etc. 
    q.put(complete_data) 
    return complete_data 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 
    f = io.open('output.txt', 'w', encoding='utf-8') 
    counter = 0 
    while 1: 
     m = q.get() 
     counter +=1 
     print(counter) 
     if m == 'kill': 
      break 
     for x in m: 
      f.write(x + NEWLINE) 
     f.flush() 
    f.close() 

if __name__ == "__main__": 
    file_searcher('path_to_some_tab_del_file.txt') 

İşlemci bilgisi

Architecture:   x86_64 
CPU op-mode(s):  32-bit, 64-bit 
Byte Order:   Little Endian 
CPU(s):    20 
On-line CPU(s) list: 0-19 
Thread(s) per core: 1 
Core(s) per socket: 1 
Socket(s):    20 
NUMA node(s):   2 
Vendor ID:    GenuineIntel 
CPU family:   6 
Model:     45 
Model name:   Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz 
Stepping:    2 
CPU MHz:    2596.501 
BogoMIPS:    5193.98 
Hypervisor vendor:  VMware 
Virtualization type: full 
L1d cache:    32K 
L1i cache:    32K 
L2 cache:    256K 
L3 cache:    25600K 
NUMA node0 CPU(s):  0-19 

Linux çalışır böylece

  • nasıl kod ayarlayabilirsiniz çekirdek sürümü

    3.10.0-514.26.2.el7.x86_64 
    

    Python sürümü

    Python 3.6.1 :: Continuum Analytics, Inc. 
    

    LOG @yacc önerdiği gibi kod eklendi
    , bu şu günlüğü verecektir:

    [server scripts]$ python main_v3.py 
    [INFO/SyncManager-1] child process calling self.run() 
    [INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6 
    [INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw' 
    [DEBUG/MainProcess] requesting creation of a shared 'Queue' object 
    [DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0' 
    [DEBUG/MainProcess] INCREF '7f0842da56a0' 
    [DEBUG/MainProcess] created semlock with handle 139673691570176 
    [DEBUG/MainProcess] created semlock with handle 139673691566080 
    [DEBUG/MainProcess] created semlock with handle 139673691561984 
    [DEBUG/MainProcess] created semlock with handle 139673691557888 
    [DEBUG/MainProcess] added worker 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
    [DEBUG/MainProcess] added worker 
    [INFO/ForkPoolWorker-2] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-4] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-3] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-6] child process calling self.run() 
    [INFO/ForkPoolWorker-5] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-7] child process calling self.run() 
    [INFO/ForkPoolWorker-8] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-9] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-10] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-11] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-12] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-13] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-14] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-15] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-16] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-17] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-18] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-19] child process calling self.run() 
    [DEBUG/MainProcess] added worker 
    [DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-20] child process calling self.run() 
    jobs running: 1 
    jobs running: 2 
    jobs running: 3 
    jobs running: 4 
    [DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
    [INFO/ForkPoolWorker-21] child process calling self.run() 
    jobs running: 5 
    jobs running: 6 
    jobs running: 7 
    [DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0' 
    jobs running: 8 
    written to file 
    jobs running: 9 
    jobs running: 10 
    [DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection 
    [DEBUG/ForkPoolWorker-2] making connection to manager 
    jobs running: 11 
    jobs running: 12 
    jobs running: 13 
    jobs running: 14 
    jobs running: 15 
    [DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2' 
    jobs running: 16 
    jobs running: 17 
    jobs running: 18 
    jobs running: 19 
    [DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0' 
    [DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0' 
    
  • +0

    Linux, Python, MP paketi ve donanım/işlemci sürümleri hakkında bilgi verebilir misiniz? – yacc

    +0

    İstediğiniz bilgileri ekledim (bkz. Düzenleme) @yacc MP paketi sürümünü nasıl elde edeceğimi anlayamadım. Umarım problemi bulabilirsiniz – CodeNoob

    +0

    çok işlemcili çekirdek kütüphanesinin bir parçası, bu yüzden geri kalanı ile aynı versiyonu. Benim için (Python 3.4.3) kod tamam çalışıyor (değiştirdiğim tek şey csvreader'ı kaldırmak ve bunun yerine normal dosyayı okumaktı). Başka bir yerde çoğaltmaya çalıştın mı? – MacHala

    cevap

    0

    Neden bu soruna sahip olduğunuza dair bir fikriniz yok, ancak bu kod aynı şeyi yapan daha basit bir şey değil mi?

    import csv 
    import multiprocessing as mp 
    
    NEWLINE = '\n' 
    
    def file_searcher(file_path): 
        with open(file_path, 'r', encoding='utf-8') as input_file, 
          open('output.txt', 'w', encoding='utf-8') as f, 
          mp.Pool() as pool: 
         parsed_file = csv.DictReader(input_file, delimiter='\t') 
         for result in pool.imap(worker, parsed_file): 
          f.write(result + NEWLINE) 
    
    def worker(genome_row): 
        complete_data = [] 
        #data processing 
        #ftp connection to retrieve data 
        #etc. 
        return complete_data 
    
    if __name__ == "__main__": 
        file_searcher('path_to_some_tab_del_file.txt') 
    
    İlgili konular