2011-01-19 18 views
7

Bazı mesajları dönüştürmek ve küçük bir gecikme sonrasında bunları aktarmak için Reaktif Uzantılar kullanmak istiyorum.Geciktirme ve tekilleştirme Reaktif Uzantılar (Rx) kullanarak

mesajları şuna benzer:

  • gecikme uzunluğu:

    class OutMsg 
    { 
        int GroupId { get; set; } 
        string Content { get; set; } 
        OutMsg(InMsg in) 
        { 
         GroupId = in.GroupId; 
         Content = Transform(in.Content); // function omitted 
        } 
    } 
    

    gereksinimleri bir çift vardır:

    class InMsg 
    { 
        int GroupId { get; set; } 
        int Delay { get; set; } 
        string Content { get; set; } 
    } 
    

    çıktı şuna benzer mesajın içeriğine bağlıdır.

  • Her iletide, bir GroupId
  • Varsa, daha önce GroupId ile aktarımı bekleyen gecikmeli mesaj olarak daha yeni bir mesaj gelirse, ilk mesaj bırakılmalı ve sadece ikinci mesaj yeni bir gecikme periyodundan sonra iletilmelidir.

Verilen bir gözlemlenebilir <InMsg> ve bir Gönderme işlevi:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

Ben dönüşümü gerçekleştirmek için seçin kullanabilirsiniz anlıyoruz.

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • nasıl bir mesaj geciktirmek belirtmek başvurabilir? (Bu, iletilerin siparişinin teslim edilmemesiyle sonuçlanabileceğini/içermesi gerektiğini unutmayın.)
  • Aynı GroupId ile iletileri nasıl kaldırabilirim?
  • Rx bu problemi çözebilir mi?
  • Bunu çözmenin başka bir yolu var mı?

cevap

7

Sen çıkışını geciktirmek için ve Switch emin yeni değerleri grubunda önceki değerlerin yerine yapmak için bir IGroupedObservable, Delay yapmak için GroupBy kullanabilirsiniz:

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

uygulanmasına ilişkin bir not: eğer bir gruplandırılmış değer mesajı (yani. gecikmeden sonra), bu

+0

Bununla bir oyun yaşadım ve beklediğim şeyi yapmıyor. Abonelik bir "System.Collections.Generic.AnonymousObservable'1 [OutMsg]" – chillitom

+0

alır.Visual Studio'da "Seç" i seçerseniz, bir "IObservable " döndürdüğünü bildirmelidir. Eğer IObservable > 'i döndürdüyseniz, Switch'i aramazsınız –

0

Rx çerçevekullanarak gecikmesini çözer yeni gecikme başlayacak gönderilir sonra geldi 210 uzatma yöntemi. Geciktirme ile kuyruklama, Gecikmeden sonra normal bir LINQ sıralaması uygulanarak çözülebilir, ardından DistinctUntilChanged.

Güncelleme: Buradaki gecikme yaklaşımı tek başına çalışmaz. Bir şekilde, gelen mesajların peşinde olduğunuz gecikme içinde sıraya girmeniz gerekir. Bu, BufferWithTime uzantı yöntemi ile gerçekleştirilir. Bu yöntem, daha sonra bir sonraki gözlemciye satır içi yayınlamadan önce çift kopyalayabileceğiniz mesaj listelerini döndürecektir.