2011-11-27 20 views
6

Stream.DataAvailable öğesi yanlış olana kadar aşağıdaki gözlemlenebilir yinelemeyi nasıl yapabilirim? Şu anda hiç durmuyor gibi görünüyor.Fast Repeat TakeWhile sonsuz döngüye neden oluyor

AsyncReadChunk ve Gözlemlenebilir. Defer bölümünde ilerleyin OnNext çağrısı yapın ve OnCompleted call. Yineleme OnNext çağrısını aldığında, onu TakeWhile'a geçirir. TakeWhile'ın memnun olmadığında, gözlemlenebilir olanı tamamlar ama OnNext'in hemen ardından gelen OnCompleted'in, tekrar tekrar gözlemlenebilir ve tekrarlayan sonsuz döngüye neden olmasını sağladığını düşünüyorum.

Bu davranışı nasıl düzeltebilirim?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
{ 
    return Observable.Defer(() => 
     { 
      try 
      { 
       return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); 
      } 
      catch (Exception) 
      { 
       return Observable.Return(new byte[0]); 
      } 
     }) 
     .Repeat() 
     .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
} 
+4

Sorunu nasıl çözeceğinizi anlamanız için çok teşekkürler ve çözümünüzü paylaştığınız için teşekkür ederiz. Bununla birlikte, sorunun cevabını, sorunuzu düzenlemek yerine, yanıt olarak gönderebilir misiniz? –

+0

Samet, kendi cevabınızı sorudan çıkardım ve topluluk vikisi olarak işaretlenmiş ayrı bir cevaba aktardım. –

cevap

2

KENDİNİ YANIT: (Aşağıda Samet söz yazarı tarafından gönderildi bir cevaptır Ancak, o sorunun bir parçası olarak yanıt gönderdiniz Ayrı içine taşınıyorum.. cevap, yazar bunu kendisi hareket etmedi çünkü, toplum wiki olarak işaretlenmesi.)


Ben zamanlayıcıları ile bir sorun olduğunu üstlenmeden tarafından keşfedildi. Geri Dönme, CurrentThread'i kullanırken Sıfır komutunu kullanır. Sabit kod aşağıda.

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
    { 
     return Observable.Defer(() => 
            { 
             try 
             { 
              return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
             catch (Exception) 
             { 
              return Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
            }) 
      .Repeat() 
      .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
    } 
İlgili konular