2010-06-29 24 views
5

Merhaba, bir yapımcı ve tüketici ile işbirliği yapmak için iş parçacığı istiyorum. Tüketici oldukça yavaş ve üretici çok hızlı ve patlamalarda çalışıyor. Örneğin, tüketici 20 saniye başına bir mesajı işleyebilir ve üretici bir saniyede 10 mesaj üretebilir, ancak bir kerede uzun bir süre bunu yapabilir, böylece tüketici yetişebilir. Eğer problemi açıklayanC# ileti dizisi iletişimi

Stream commonStream; 
AutoResetEvent commonLock; 

void Producer() 
{ 
    while (true) 
    { 
    magic.BlockUntilMagicAvalible(); 
    byte[] buffer = magic.Produce(); 
    commonStream.Write(buffer); 
    commonLock.Set(); 
    } 
} 

void Consumer() 
{ 
    while(true) 
    { 
    commonLock.WaitOne(); 
    MagicalObject o = binarySerializer.Deserialize(commonStream); 
    DoSomething(o); 
    } 
} 
+0

Hangi sürümleri kullanıyorsunuz, tam olarak bu şeyler için v4 için bazı yeni şeyler var. –

+0

.Net 3.5; Yorumlar en az 15 karakter uzunluğunda olmalıdır. –

cevap

11

Eğer bu şekilde tampon hiçbir veri yoksa uyuyacak bir BlockingCollection

int maxBufferCap = 500; 
BlockingCollection<MagicalObject> Collection 
          = new BlockingCollection<MagicalObject>(maxBufferCap); 
void Producer() 
{ 
    while (magic.HasMoreMagic) 
    { 
     this.Collection.Add(magic.ProduceMagic()); 
    } 
    this.Collection.CompleteAdding(); 
} 

void Consumer() 
{ 
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable()) 
    { 
     DoSomthing(magicalObject); 
    } 
} 

foreach hattını kullanarak bunu yapabilirsiniz Ayrıca koleksiyona bir şey eklendiğinde otomatik olarak kendini uyandırır.

Azami arabelleği ayarlamamın nedeni, üreticinizin tüketiciden çok daha hızlı olması durumunda, daha fazla nesne koleksiyona eklendikçe çok fazla bellek tüketmeniz olabilir. Tampon boyutuna ulaşıldığında engelleme koleksiyonunu oluştururken maksimum tampon büyüklüğünü ayarlayarak üreticinin Add çağrısı, tüketici tarafından koleksiyondan bir öğe çıkarılıncaya kadar engellenecektir.

BlockingCollection sınıfının başka bir avantajı, istediğiniz kadar üretici ve tüketiciye sahip olması, 1: 1 oranında olması gerekmemektedir. DoSomthing bunu destekliyorsa, Sen bir kuyruk ve zamanlayıcı kullanarak istediğiniz şeye ulaşırsınız

void ConsumersInParalell() 
{ 
    //This assumes the method signature of DoSomthing is one of the following: 
    // Action<MagicalObject> 
    // Action<MagicalObject, ParallelLoopState> 
    // Action<MagicalObject, ParallelLoopState, long> 
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing); 
} 
+2

TPL'nin .NET 3.5'e geri yüklendiğini unutmayın: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in- net-35-using.html –

0

(hatta Parallel.ForEach kullanmak ve veri kaynağı olarak tüketen enumerable kullanın) bilgisayarın çekirdek başına bir foreach döngü olabilir. Üretici kuyruğa değerler ekler ve tüketici zamanlayıcısını başlatır. Tüketici zamanlayıcının geçen olay (bir Threadpool iş parçacığı üzerindedir) zamanlayıcıyı durdurur ve boşalana kadar sıraya geçer, sonra kaybolur (gereksiz yoklama yok). Tüketici hala çalışırken üretici sıraya ekleyebilir.

System.Timers.Timer consumerTimer; 
Queue<byte[]> queue = new Queue<byte[]>(); 

void Producer() 
{ 
    consumerTimer = new System.Timers.Timer(1000); 
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed); 
    while (true) 
    { 
     magic.BlockUntilMagicAvailable(); 
     lock (queue) 
     { 
      queue.Enqueue(magic.Produce()); 
      if (!consumerTimer.Enabled) 
      { 
       consumerTimer.Start(); 
      } 
     } 
    } 
} 

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
{ 
    while (true) 
    { 
     consumerTimer.Stop(); 
     lock (queue) 
     { 
      if (queue.Count > 0) 
      { 
       DoSomething(queue.Dequeue()); 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 
} 
+0

snippet'iniz iş parçacığı güvenli değil ... ve benim yok yok yok yok yok –

+0

Bu konu hakkında güvenli olmayan nedir? Ve yoklama yok - zamanlayıcı, üretici sıraya eklediğinde sadece etkinleştirilen tek vuruştur. –

-1

Mutex'in ürünlerini kullanıyorum. Fikir, her ikisinin de farklı iş parçacıklarında çalışmasıdır. Tüketici iş parçacığı, Producer tarafından serbest bırakılıncaya kadar süresiz olarak oturacağı bir muteks tarafından kilitlenir. Daha sonra, devam etmek için Producer'ı bırakarak verileri paralel olarak işleyecektir. Tüketici, tamamlandığında yeniden kilitlenir. Bu Tüketici uyandırmaya bekler iken çok az milisaniye tarafından Yapımcısı yavaşlatır}

(Kod başlık açıp diğer kalite bitleri kısalık için atlanmıştır.)

// Pre-create mutex owned by Producer thread, then start Consumer thread. 
Mutex mutex = new Mutex(true); 
Queue<T> queue = new Queue<T>(); 

void Producer_AddData(T data) 
{ 
    lock (queue) { 
    queue.Enqueue(GetData()); 
    } 

    // Release mutex to start thread: 
    mutex.ReleaseMutex(); 
    mutex.WaitOne(); 
} 

void Consumer() 
{ 
    while(true) 
    { 
    // Wait indefinitely on mutex 
    mutex.WaitOne(); 
    mutex.ReleaseMutex(); 

    T data; 
    lock (queue) { 
     data = queue.Dequeue(); 
    } 
    DoSomething(data); 
    } 

ve muteksi serbest bırakın. Bununla yaşayabilirsen.

+0

'BlockingCollection' kullanımı çok daha iyidir. Öncelikle, muteksleri kullanmaktan daha doğru olduğu zaman daha açık ve modelinizden farklı olarak üretici ve tüketici paralel çalışabilir; Kodunuzun * üretmekte * veya * tüketiyor olmasını, ancak her ikisini de değil. Ayrıca, tek bir üreticinin veya birden fazla tüketicinin, aynı zamanda önemsiz olan bir engelleme koleksiyonundan farklı olarak, iyi bir şekilde ölçeklenmez. Bir engelleme koleksiyonunun faydaları olan daha karmaşık bir muteks temelli yaklaşım kullanabilirdiniz, fakat bu bir iş * lot * ve daha az okunabilir/sürdürülebilir olacaktır. – Servy

+0

BlockingColletion, 4.5'ı çalıştırabildiğim için kullanılamaz. Eğer yapabilirsem muhtemelen bu doğru çözüm olurdu. Ancak bu kod paralel olarak çalışır. Açık olmayabilirdim, ama ikisi farklı konularda. Bunu bir iş parçacığında ağır SQL sorguları çalıştırmak için kullanıyorum, başka bir iş parçacığı üzerinde veri toplarken ve benim için iyi çalışıyor. – Ben

+0

BlockingCollection, 4.5 değil 4.5'de eklendi. – Servy