2013-01-08 32 views
5

3 işlemi başlatıyorum ve bunların bir diziyi (i) işlemine karşılık gelen dizinde paylaşılan diziye koymasını istiyorum. Aşağıdaki koduPython: çoklu işleme ve c_char_p dizisi

Bak, üretilen çıktı şöyle olur:

['test 0', None, None] 
['test 1', 'test 1', None] 
['test 2', 'test 2', 'test 2'] 

Neden Test 0 'test 1 tarafından üzerine ve test 2 tarafından test 1 olsun? Benim istediğim

(sırası önemli değildir) 'dir:

['test 0', None, None] 
['test 0', 'test 1', None] 
['test 0', 'test 1', 'test 2'] 

kodu:

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Value, Lock, Process, Array 
import ctypes 
from ctypes import c_int, c_char_p 

class Consumer(multiprocessing.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      multiprocessing.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr=self.arr, lock=self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr=None, lock=None): 
     with lock: 
      arr[self.i] = "test %d" % self.i 
      print arr[:] 

    def __str__(self): 
     return 'ARC' 

    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    arr = Array(ctypes.c_char_p, 3) 

    lock = multiprocessing.Lock() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

Python 2.7.3 (Ubuntu) koşuyorum

cevap

5

Bu sorun görünüyor this one benzeri. Burada J.F. Sebastian, ödevin arr[i] no'lu arr[i] no'lu adrese atanmasını, sadece atamayı yapan alt işlem için anlamlı olan bir bellek adresine atfetmişti. Diğer alt süreçler bu adrese bakarken çöp toplar.

Bu sorunu önlemek için en az iki yol vardır.

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, lock, lst): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.lock = lock 
      self.lst = lst 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(lock = self.lock, lst = self.lst) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, lock, lst): 
     with lock: 
      lst[self.i] = "test {}".format(self.i) 
      print([lst[i] for i in range(3)]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    manager = mp.Manager() 
    lst = manager.list(['']*3) 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

başka bir yolu mp.Array('c', 10) gibi sabit bir büyüklüğü olan bir ortak dizi kullanmaktır: biri multiprocessing.manager listesini kullanmaktır.

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr = self.arr, lock = self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr, lock): 
     with lock: 
      arr[self.i].value = "test {}".format(self.i) 
      print([a.value for a in arr]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    arr = [mp.Array('c', 10) for i in range(3)] 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

Ben mp.Array('c', 10) bellek adresi asla değişmesini sağlayın sabit bir boyuta sahip olduğundan mp.Array(ctypes.c_char_p, 3) değişken bir boyut sahipken mp.Array(ctypes.c_char_p, 3) değil yaptığında bu işleri nedeni, olduğunu spekülasyon, böylece arr[i] olduğunda bellek adresi değişebilir daha büyük bir dizeye atanır.

Belki de bu the docs, bu devletler ne zaman uyarı budur

paylaştıkça bellekte bir işaretçi bu belirli adres alanında bir konuma işaret edeceği hatırlamak saklamak mümkün olsa

süreci. Ancak, işaretçinin ikinci bir işlemin bağlamında geçersiz olması ve işaretçiyi 'dan kaldırmaya çalışmak, büyük olasılıkla ikinci işlemin çökmesine neden olabilir.

+0

Bir milyar kez teşekkür ederiz! Her iki çözümünüz de gerçekten çalışıyor :) J.F. Sebastian'ın bu yazısına rastladım ama bir nedenden ötürü uygulayamadım ... doh! Şimdi bana heykelini nerede kurmam gerektiğini söyle! Tekrar teşekkürler ... – Ujoux

+0

İlginç soru ve coşkunuz için teşekkürler! Stackoverflow hakkında daha fazla görüşmek dileğiyle. Heykeller gelince - ben onay işareti yukarıda üstündeki tıklayarak oldukça müthiş bir yapar; ^) – unutbu

+0

Ben gerekli üne sahip olur olmaz yapmaz, unutmayacağım;) – Ujoux

İlgili konular