2012-03-21 22 views
9

Altprocess.Popen kullanarak aradığım bir yürütülebilir program var. Daha sonra, bir veriyi başka bir iş parçacığıyla doldurulan bir Kuyruktan okuyan bir iş parçacığı kullanarak stdin aracılığıyla bazı verilerle beslemeyi planlıyorum. Çıkış, başka bir iş parçacığı içinde stdout borusu kullanılarak okunmalı ve yine bir Kuyrukta sıralanmalıdır.python: thread altprocess çıktısını okuma

Önceki araştırmamdan anladığım kadarıyla, Queue ile thread kullanmak iyi bir uygulamadır.

Dışa aktarma, maalesef, hızlı bir şekilde yazılan her satır için bir yanıt vermeyecek, böylece basit yazma, okuma döngüsü döngüleri bir seçenek değildir. Yürütülebilir bazı iç çoklu iş parçacığı uygular ve çıktının alındığı anda çıktısını istiyorum, bu nedenle ek okuyucu iş parçacığı. Sadece her satırı (shuffleline.py) olarak karıştırılır yürütülebilir test etmek için bir örnek olarak

:

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

bu zaten mümkün olduğunca tamponsuz olduğuna dikkat edin. Yoksa öyle değil mi?

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

bu aşağıdaki çıkışı (python2.7) vermek:

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

sonra 2 saniye boyunca hiçbir şey ...

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

binişimi Bu benim soyunmuş bir test programı başlangıçta bekleniyor. Ancak, alt işlemin çıktısı, alt yordamının sona ermesinden sonra görünene kadar görünmez. Daha fazla hat borusu ile çıktı aldım, bu yüzden stdout borusunda bir önbellek problemi var. Burada yayınlanan diğer sorulara göre stdout'un (alt süreç içinde) temizlenmesi en azından Linux üzerinde çalışmalıdır. Eğer olmadıkça

cevap

7

Senin sorunun, başlamak konuları kullanarak daha kötü subprocess modülünü ilgisi veya konuları (oldukları gibi sorunlu), hatta karıştırma subprocesses ve konuları (bir çok kötü bir fikir vardır Eğer code.google.com/p/python-subprocess32 alabilirsiniz Python 3.2 en alt süreç modülünün backport) kullanarak veya print ifadeler gibi (birden çok iş parçacığı aynı şeyleri erişen.)

ne olur senin shuffleline.py programı tamponlar olduğunu. Çıkışta değil, girişte. Çok açık olmasa da, bir fileobject üzerinde yinelediğinizde, Python genellikle 8k baytlık blokları okuyacaktır.

for line in sys.stdin: 
    line = line.strip() 
    .... 

Bunu, ya (EOF '' döndüren) sys.stdin.readline() aranacak bir süre döngüsü kullanmamaktadır isterseniz::

sys.stdin bir FileObject olduğu için, lütfen for döngü EOF veya tam bloğun kadar tamponlayacaktır
while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

veya ikinci bağımsız değişkeni ("sentinel") kadar ilk bağımsız değişken çağıran bir yineleyici oluşturur iter() iki bağımsız değişken formu döndürülür kullanımı:

for line in iter(sys.stdin.readline, ''): 
    line = line.strip() 
    ... 

Bunun için iş parçacığı kullanmamayı önerdim, ancak alt işlemcinin borularında engellemeyen G/Ç yerine, hatta süreçleri ve diğer şeyleri birbirine bağlamanın birçok yolu olan twisted.reactor.spawnProcess gibi bir şey bile önermediysem de berbat olurum Tüketici ve üretici olarak.

+0

Teşekkürler, bu çözüm! – muckl

+1

Alt işlem ve iş parçacığı karışımının neden bu kadar korkunç bir yaklaşım olduğunu sorabilir miyim? Hiçbir şey olmuyorken, engellemesiz I/O’yu tekrar tekrar aramaktan daha zarif görünüyor. Açıkçası, iş parçacıkları herhangi bir evreli olmayan veri yapılarına erişmemelidir, ancak sadece bir Kuyruktan okuma veya yazma güvenli görünüyor. Python3.2 backport'daki değişiklikler benimki gibi basit bir durum için önemli midir? – muckl

+3

Özellikle iş parçacığı ve alt işlemlerle ilgili sorun, iplik ve çatalı karıştırma problemidir. Http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them ve diğer benzer makalelere göz atın. Python 3.2 alt işlem backport bu konular etrafında çalışır. Genel olarak iş parçacıklarına gelince, asıl sorun kontrol etmek ve hata ayıklamak zor olmasıdır. Örneğin, onları "dıştan" dışından öldüremezsiniz, bu yüzden bir iplik bir okumada veya bir yazıda sıkışmışsa, bu konuda yapabileceğiniz hiçbir şey yoktur. –

İlgili konular