2012-08-16 13 views
18

Aynı anda çalışan örneklerin sayısını sınırlarken (örneğin, sistemimde bulunan CPU çekirdek sayısı), aynı anda birden çok program.py örneğini çalıştırmak istiyorum.). Örneğin, 10 çekirdeğim varsa ve toplam 1000 program.py işlemi yapmam gerekiyorsa, herhangi bir zamanda yalnızca 10 örnek oluşturulacak ve çalışacaktır.Çalışan işlemlerin sayısını sınırlarken Python'da çok işlem yapma

Çoklu işlem modülünü, çoklu iş parçacığı kullanmayı ve sıraları kullanmayı denedim, ancak kolay bir uygulamaya kendini ödünç vermek için bana görünen hiçbir şey yok. Sahip olduğum en büyük sorun, aynı anda çalışan süreçlerin sayısını sınırlamak için bir yol bulmak. Bu önemlidir çünkü bir seferde 1000 süreç oluşturursam, bir çatal bombasına denk olur. Süreçlerden program aracılığıyla döndürülen sonuçlara (diskten çıktılar) ihtiyacım yok ve işlemlerin tümü birbirinden bağımsız olarak çalışıyor.

Python veya hatta bash'ta nasıl uygulayabileceğime dair bana bir öneri veya örnek verebilir misiniz? Şimdiye kadar yazdığım kodları kuyruğu kullanarak gönderiyorum, ancak amaçlandığı gibi çalışmıyor ve zaten yanlış yola girmiş olabilir.

Çok teşekkürler. yerine Python daha

+2

[Python işlem havuzları] 'nı denediniz mi (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool)? – C2H5OH

+0

Bunu yapmanın en kolay yolu, "multiprocessing.pool" öğesini oluşturan ve işçinin (program.py) iş parçacığını yansıtan bir "denetleyici" programı oluşturmaktır; – jozzas

+0

Teşekkürler, bunu deneyeceğim; ilk denememde bir nedenden dolayı multiprocessing.pool'un istediğim gibi olmadığına vardım, ama şimdi doğru görünüyor. Bu durumda, çalışan iş parçacıkları sadece program.py (alt işlemle birlikte bir iş parçacığı olarak mı? Popen) oluştururlar? Takip edebileceğim kaba bir örnek veya şablon uygulaması gönderir misiniz? – steadfast

cevap

2

Bash komut, ama basit paralel işlem için sıklıkla kullanıyorum:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

Veya çok basit durumlar için

, başka bir seçenek P procs sayısını belirler Xargs olduğunu

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

Bir işlem süpervizörü kullanmalısınız. Bir yaklaşım, "programlı" yapmak için Circus tarafından sağlanan API'yi kullanıyor olabilir, dokümantasyon sitesi artık çevrimdışıdır ancak bunun sadece bir geçici problem olduğunu düşünüyorum, bunun için de Circus'u kullanabilirsiniz. Başka bir yaklaşım ise supervisord kullanıyor ve işlemin numprocs parametresini sahip olduğunuz çekirdek sayısına ayarlıyor.

Circus kullanıldığı bir örnek:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

Sana Pool.map yaklaşımı size çok mantıklı değil belirtti biliyoruz. Harita, bir çalışma kaynağı vermenin kolay bir yoludur ve her bir öğeye uygulanabilecek bir kalburdur. Harita için func, verilen arg üzerinde gerçek işi yapmak için herhangi bir giriş noktası olabilir.

o sizin için doğru görünmüyorsa

, bir Üretici-Tüketici deseni kullanma hakkında buraya oldukça ayrıntılı cevabı var: https://stackoverflow.com/a/11196615/496445

Esasen, bir Kuyruk yaratmak ve n işçi sayısını başlar. Ardından sıraya ana iş parçacığından beslenir veya kuyruğu besleyen bir Üretici işlemi oluşturulur. İşçiler kuyruğundan çalışmaya devam ediyorlar ve başladığınız süreçlerin sayısından daha fazla eş zamanlı çalışma olmayacak.

Ayrıca, kuyruğa bir sınır koyma seçeneğine de sahipsiniz; böylece, üreticinin tükettiği hız ve kaynaklara da kısıtlamalar koymanız gerekiyorsa, çok fazla bekleyen iş olduğunda üreticiyi engeller.

Çağrılan iş işlevi, istediğiniz her şeyi yapabilir. Bu, bazı sistem komutlarının etrafında bir sarıcı olabilir veya python lib'inizi içe aktarabilir ve ana rutini çalıştırabilir. Orada, keyfi çalıştırılabilirlerinizi sınırlı kaynaklar altında çalıştırmak için yapılandırmaları ayarlamanıza izin veren belirli süreç yönetimi sistemleri vardır, ancak bu sadece bunu yapmak için temel bir python yaklaşımıdır.

Temel Havuz: Benim o other answer itibaren

Snippet'ler

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

bir süreç yöneticisi ve yapımcı kullanma

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

Çok iyi bir örnek, http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core adresinde açıkladıkları gibi CPUS sayısını alarak geliştirdim ve böylece sayısal olarak num_workers'ı ayarlayabilirim Makinenin CPU'ları. –

0

çoklu işlem kullanma konusunda birçok cevaplar varken .pool, h üzerinde çok sayıda kod parçacığı yok çok işlemciyi kullanmak için kullanacağız.Process, gerçekten bellek kullanımı önemli olduğunda daha yararlıdır. 1000 işlemin başlatılması CPU'yu aşırı yükler ve belleği kaybeder. Her işlem ve veri aktarım hatları bellek yoğunsa, OS veya Python kendi başına paralel işlem sayısını sınırlar. İşlemciye CPU'lara gönderilen eşzamanlı iş sayısını sınırlamak için aşağıdaki kodu geliştirdim. Parti boyutu, CPU çekirdeği sayısıyla orantılı olarak ölçeklendirilebilir. Windows PC'mde, parti başına iş sayısı, mevcut CPU garantilerinin 4 katına kadar verimli olabilir.

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

tek sorun varsa toplu iş her tamamlayamadı, ve asma duruma yol açar, işlerin toplu geri kalanı başlatılan edilmeyecektir. Bu nedenle, işlenecek işlevin uygun hata işleme rutinlerine sahip olması gerekir.

İlgili konular