2012-08-11 18 views
10

ÖNEMLİ: sonuçlarınızın açıklaması ve biraz daha ayrıntılı, nesnelerin/genellikle olaylar dizisi Gruba gerekRx operatörü

benim cevaba da bir göz atın ve filtrelemek için lütfen çoğaltılır, bunları bir TimeSpan aralığı ile tamponlar. Ben mermer diyagramların çeşit daha iyi anlatmaya çalışacağım:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z 

üretecektir X, Y ve Z farklı olay türleri vardır ve '---' aralık anlamına

X---Y---Z---X---Y---Z 

. Ayrıca, yapardım da ortak bir temel sınıf çünkü her türlü geçerli bir anahtar özelliğiyle farklı nasıl bir:

X, Y, Z : A 

ve A bir özellik Anahtarı içerir. Xa anlamı X.Key = bir gösterim kullanılarak, son bir örnek olacaktır:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c 

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b 

kimse beni birlikte gerekli Linq operatörleri (muhtemelen DistinctUntilChanged ve Tampon) koyarak yardım edebilir üretecektir Bu davranışa ulaşmak? Teşekkür

GÜNCELLEME 18.08.12: istendiği gibi

, ben daha iyi bir açıklama vermeye çalışıyoruz. Olayları bir web servisine toplayıp gönderen cihazlarımız var. Bu cihazların eski bir mantığı vardır (ve geriye dönük uyumluluk nedeniyle bunu değiştiremeyiz) ve bir onay aldıkça sürekli bir etkinlik gönderirler; onayladıktan sonra, sıradaki olayları sıraya gönderir, vb. Olaylar, birimin ağ adresini ve her aygıt için sıradaki olayları ayıran bazı diğer özellikleri içerir. Bir olay şöyle görünür:

class Event 
{ 
    public string NetworkAddress { get; } 

    public string EventCode { get; } 

    public string AdditionalAttribute { get; } 
} 

gol (biz toplu olarak bunu yapmak istemiyorum bu yüzden) veritabanında bilgi, seçkin olaylar tüm cihazlardan alınan her 5 saniyede işleme depolama olmasıdır ve ack'i cihaza yollamak. Muhtemelen basit bir kombinasyon olduğunu düşünüyorum

Device 'a'   : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-... 
Device 'b'   : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-... 

Time    : ------------[1s]-----------[2s]------------[3s]------------[4s]- 
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]- 

P1: Server stores and acknowledges [a1] and [b1] 
P2: "  "  " "   [b2] 
P3: "  "  " "   [a2] and [b3] 
P4: "  "  " "   [a3] and [b4] 

Sonunda:

Device 'a': 
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x' 
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y' 
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x' 

Device 'b': 
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y' 
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x' 
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y' 
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x' 

Pn are the operations done by our server, explained later 

Olası mermer diyagramı (giriş akışları + çıkış akımı): tek iki cihaz ve bazı olaylarla bir örnek yapalım temel operatörler, ama ben Rx için yeniyim ve biraz çıkışım var, çünkü aynı çıktı akışını elde etmek için çok sayıda operatör (veya operatör kombinasyonları) var gibi görünüyor.

Güncelleme 19.08.12: Bu kod, bir sunucu üzerinde çalışır ve ben deneklerin davranışı hakkında emin değilim ... bellek sızıntılarını olmadan günlerce çalışması gerektiğini

unutmayın.Şu anda, her bir olay için, bir sorguda (konuların kullanımı konusunda yanılmıyorsam) bir Konunun OnNext'ini sorgulayabilmem için bir hizmet üzerinde bir basma işlemi çağırıyorum.

Güncelleme 20.08.12:

doğrulama testi dahil Güncel uygulanması; Bu ne denedim olduğunu ve @yamen

public interface IEventService 
{ 
    // Persists the events 
    void Add(IEnumerable<Event> events); 
} 

public class Event 
{ 
    public string Description { get; set; } 
} 

/// <summary> 
/// Implements the logic to handle events. 
/// </summary> 
public class EventManager : IDisposable 
{ 
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5); 

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>(); 

    private readonly IDisposable subscription; 

    private readonly object locker = new object(); 

    private readonly IEventService eventService; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="EventManager"/> class. 
    /// </summary> 
    /// <param name="scheduler">The scheduler.</param> 
    public EventManager(IEventService eventService, IScheduler scheduler) 
    { 
     this.eventService = eventService; 
     this.subscription = this.CreateQuery(scheduler); 
    } 

    /// <summary> 
    /// Pushes the event. 
    /// </summary> 
    /// <param name="eventMessage">The event message.</param> 
    public void PushEvent(EventMessage eventMessage) 
    { 
     Contract.Requires(eventMessage != null); 
     this.subject.OnNext(eventMessage); 
    } 

    /// <summary> 
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. 
    /// </summary> 
    /// <filterpriority>2</filterpriority> 
    public void Dispose() 
    { 
     this.Dispose(true); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (disposing) 
     { 
      // Dispose unmanaged resources 
     } 

     this.subject.Dispose(); 
     this.subscription.Dispose(); 
    } 

    private IDisposable CreateQuery(IScheduler scheduler) 
    { 
     var buffered = this.subject 
      .DistinctUntilChanged(new EventComparer()) 
      .Buffer(EventHandlingPeriod, scheduler); 

     var query = buffered 
      .Subscribe(this.HandleEvents); 
     return query; 
    } 

    private void HandleEvents(IList<EventMessage> eventMessages) 
    { 
     Contract.Requires(eventMessages != null); 
     var events = eventMessages.Select(this.SelectEvent); 
     this.eventService.Add(events); 
    } 

    private Event SelectEvent(EventMessage message) 
    { 
     return new Event { Description = "evaluated description" }; 
    } 

    private class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 
} 

public class EventMessage 
{ 
    public string NetworkAddress { get; set; } 

    public byte EventCode { get; set; } 

    public byte Attribute { get; set; } 

    // Other properties 
} 

Ve testin önerdiği aynı görünüyor:

public void PushEventTest() 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 

     var eventServiceMock = new Mock<IEventService>(); 

     var scheduler = new TestScheduler(); 
     var target = new EventManager(eventServiceMock.Object, scheduler); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     scheduler.Schedule(() => target.PushEvent(eventMessageA1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(1),() => target.PushEvent(eventMessageB1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(2),() => target.PushEvent(eventMessageA1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once()); 

     scheduler.Schedule(TimeSpan.FromSeconds(3),() => target.PushEvent(eventMessageB1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once()); 
    } 

Ayrıca, ben yazılım olmadan günlerce çalıştırabilir gerçekten önemli olduğunu tekrar sözler sorunlar, binlerce mesajla ilgileniyor. Netleştirmek için: test mevcut uygulama ile geçmiyor.

+1

sorunuzun 'X.a-X.b --- Y.b-Y.c-Z.a-Z.c-Z.b' tek' --- 'aralığını gösterir final dizisi. Bu doğru mu, yoksa aralık her bir değer arasında mı olmalı? – Enigmativity

+0

Eğer mermer diyagramları bir kaynak olarak sağladıysanız ve birbirinin altında 'ölçeklemek' hedefine ulaşırsanız ya da size yardımcı olacak 'gerçek' bir örnek sağlamanız yararlı olacaktır. – yamen

+0

Yakalama Enigmativity için teşekkürler, ben çıkış @yamen sabitleyeceğim Daha fazla ayrıntı ekleyeceğim – fra

cevap

4

ben ne istiyorsunuz bu tam yaparsa emin değilim, ama açıkça sonra group anahtar kelime kullanarak ve elementler çeşitli IObservable s bunları ayrı ayrı yeniden birleştirilmesini önce işlemek için gruba olabilir.

E.g.

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'b' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
... 
subject.OnCompleted(); 
: Biz sınıf tanımlarını varsa gibi

class A 
{ 
    public char Key { get; set; } 
} 

class X : A { } 
... 

ve Subject<A>

Subject<A> subject = new Subject<A>(); 

sonra

var buffered = 
    from a in subject 
    group a by new { Type = a.GetType(), Key = a.Key } into g 
    from buffer in g.Buffer(TimeSpan.FromMilliseconds(300)) 
    where buffer.Any() 
    select new 
    { 
     Count = buffer.Count, 
     Type = buffer.First().GetType().Name, 
     Key = buffer.First().Key 
    }; 

buffered.Do(Console.WriteLine).Subscribe(); 

Verdiğiniz verilerle test edebilirsiniz yazabilir

Ge için Sağladığınız çıktı t:

{ Count = 2, Type = X, Key = a } 
{ Count = 1, Type = X, Key = b } 
{ Count = 1, Type = Y, Key = b } 
{ Count = 1, Type = Y, Key = c } 
{ Count = 2, Type = Z, Key = a } 
{ Count = 2, Type = Z, Key = c } 
{ Count = 1, Type = Z, Key = b } 
+0

% 100 kesin olmasa bile, ben burada ödül verdim çünkü cevap zamanında soru yeterince açık değildi. Her neyse, cevap oldukça iyi tartışılmıştır. – fra

2

bu tam olarak ne istediğini, ancak sizin kullanım şeklini destekler görünüyor emin değilim.

Öncelikle (kolayca ihtiyaçlarınıza göre bu değiştirebilir) kullanmasına temel sınıf tanımlayalım:

public class MyEvent 
{ 
    public string NetworkAddress { set; get; } 
    public string EventCode { set; get; } 
} 

en IObservable<MyEvent> bir dizi olarak cihazlarınızı kuralım - Farklı olarak bu mevcut olabilir, ve tabi ki, tabi ki uyum sağlamak için değişmek zorundadır. Bu cihazların her biri 0,5 ila 1,5 saniye arasında rastgele bir gecikme ile bir değer üretecektir.Şimdi

var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "2"} }; 

var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "3"} }; 

var random = new Random();         

var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 
var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 

var devices = new IObservable<MyEvent>[] { deviceARand, deviceBRand }; 

onları 'farklı' yapmak, en bu bireysel cihaz akışlarının tüm almasına izin ve tek bir ana akışına bunları birleştirmek:

var stream = devices.Aggregate(Observable.Empty<MyEvent>(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device)); 

Bu akış alınırken tüketilmesi var ki bir kez periyodik sadece Buffer ile tamponlama meselesidir: aramalar ve deneylerden sonra

stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ }); 
0

, birlikte beklediğim çıktı üretir koymak bazı kod:

static void Main(string[] args) 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 
     var comparer = new EventComparer(); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 }; 
     var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 }; 

     var queue = new BlockingCollection<EventMessage>(); 
     Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe 
      (
       l => list.ToList().ForEach(m => 
       { 
        Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId); 
        queue.Add(m); 
       }) 
      ); 

     // subscribing 
     queue.GetConsumingEnumerable() 
      .ToObservable() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(e => 
       { 
        Console.WriteLine("Queue contains {0} items", queue.Count); 
        e.Distinct(comparer).ToList().ForEach(m => 
        Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count)); 
       } 
      ); 

     Console.WriteLine("Type enter to exit"); 
     Console.ReadLine(); 
    } 

    public class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
      return result; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 

    public class EventMessage 
    { 
     public string NetworkAddress { get; set; } 

     public byte EventCode { get; set; } 

     public byte Attribute { get; set; } 

     public override string ToString() 
     { 
      const string Format = "{0} ({1}, {2})"; 
      var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute); 
      return s; 
     } 
    } 

Her halükarda, uygulamanın izlenmesi, bunun bir bellek sızıntısına neden olduğu görülmektedir. Soruma sorum var:

  • Bellek sızıntısına neden oluyor? [lütfen aşağıdaki güncelleştirmeyi görün]
  • bunu yapmanın en iyi yolu budur (ilk gözlenebilirde ayrıysa, diğer olayları sonraki arabelleklere alamıyorum, ancak her bir arabelleğin içindeki öğeler diğerleri)?
  • Test zamanlayıcısını kullanarak nasıl test yapabilirim?

GÜNCELLEME:

bellek artışı sonra değeri istikrarlı, sadece birkaç dakika sürer gibi görünüyor. Uzun bir test yapacağım. Elbette, bu kesinlikle kabul edilebilir bir davranış olacaktır.

GÜNCELLEME 26.08.12: Zaten önceki güncellemede belirtildiği gibi

  • , bellek kullanımı artıyor ancak (ve yavaşça) başlatma işleminden sonra bazı dakika. 8 saat sonra hafıza kaç KB arasında normal dalgalanmalar)
  • bu question benimkine çok benzer ve önerilen Drenaj uzatma hala teyit edilmesi benim sorunum()
iyi geçerli olabilir ile sabitti tüketilen

Her neyse, soru test zamanlayıcısı kullanarak birim testleri için hala açık olduğunu düşünüyorum.

sayesinde Francesco