2013-03-07 17 views
6

rx ve belirli bir sorgu ile sıkışmış durumdayım. sorun:Koşul değiştirilinceye kadar Rx groupby

Birçok tek güncelleştirme işlemleri sürekli akım üretilir. Operasyonlar eklenebilir veya silinebilir. Bu akışları tamponlamak ve o sırada birkaç işlem yapmak istiyorum, ancak siparişi korumak gerçekten çok önemli. Ayrıca, operasyonlar her X saniye

tamponlu ve diziler yapılmalıdır

Örnek:

insert(3)-delete(2)-insert(1)-delete(4) 

Ben test etmek için basit bir uygulama yazdım:

yılında:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete 

Out o ve ben az ya da daha az çalışır ama gelen ekleme/silme sırasına uymuyor

namespace RxTests 
{ 
using System; 
using System.Collections.Generic; 
using System.Globalization; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Text; 
using System.Threading; 

internal class Program 
{ 
    private static readonly Random Random = new Random(); 

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); 

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); 

    private static void Main(string[] args) 
    { 
     Console.WriteLine("Starting production"); 
     var producerScheduler = new EventLoopScheduler(); 
     var consumerScheduler = new EventLoopScheduler(); 
     var producer = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .SubscribeOn(producerScheduler) 
         .Subscribe(Produce, WriteProductionCompleted); 
     var consumer = 
      operations.ObserveOn(producerScheduler) 
         .GroupBy(operation => operation.Delete) 
         .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50)) 
         .SubscribeOn(consumerScheduler) 
         .Subscribe(WriteUpdateOperations); 
     Console.WriteLine("Type any key to stop"); 
     Console.ReadKey(); 
     consumer.Dispose(); 
     producer.Dispose(); 
    } 

    private static void Produce(long time) 
    { 
     var delete = Random.NextDouble() < 0.5; 
     Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time); 
     var idString = (time + 1).ToString(CultureInfo.InvariantCulture); 
     var id = time + 1; 
     operations.OnNext(
      new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture))); 
    } 

    private static void WriteProductionCompleted() 
    { 
     Console.WriteLine("Production completed"); 
     ProducerStopped.Cancel(); 
    } 

    private static void WriteUpdateOperation(UpdateOperation updateOperation) 
    { 
     Console.WriteLine("Consuming {0}", updateOperation); 
    } 

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation) 
    { 
     foreach (var operation in updateOperation) 
     { 
      WriteUpdateOperation(operation); 
     } 
    } 

    private class UpdateOperation 
    { 
     public UpdateOperation(long id, bool delete, params string[] changes) 
     { 
      this.Id = id; 
      this.Delete = delete; 
      this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); 
     } 

     public bool Delete { get; set; } 

     public long Id { get; private set; } 

     public IList<string> Changes { get; private set; } 

     public override string ToString() 
     { 
      var stringBuilder = new StringBuilder("{UpdateOperation "); 
      stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete); 
      if (this.Changes.Count > 0) 
      { 
       stringBuilder.Append(this.Changes.First()); 
       foreach (var change in this.Changes.Skip(1)) 
       { 
        stringBuilder.AppendFormat(", {0}", change); 
       } 
      } 

      stringBuilder.Append("]}"); 
      return stringBuilder.ToString(); 
     } 
    } 
} 

}

herkes doğru sorguyu bana yardımcı olabilir? Kullanılması aşağıdaki

using(query.Subscribe(Print)) 
{ 
    Console.ReadLine(); 
    producer.Dispose();   
} 

: Aşağıdaki satırlar küçük değişiklikler/sonuçlarını yazdırmak için JerKimball koduyla eklemeler vardır

Teşekkür

GÜNCELLEME 08.03.13 (JerKimball tarafından Öneriler) Yazdırma yöntemleri:

private static void Print(IObservable<IList<Operation>> operations) 
{ 
    operations.Subscribe(Print); 
} 

private static void Print(IList<Operation> operations) 
{ 
    var stringBuilder = new StringBuilder("["); 
    if (operations.Count > 0) 
    { 
     stringBuilder.Append(operations.First()); 
     foreach (var item in operations.Skip(1)) 
     { 
      stringBuilder.AppendFormat(", {0}", item); 
     } 
    } 

    stringBuilder.Append("]"); 
    Console.WriteLine(stringBuilder); 
} 

ve Çalıştırma için dizeye şu:

public override string ToString() 
{ 
    return string.Format("{0}:{1}", this.Type, this.Seq); 
} 

Sipariş korunur ancak:

  • Başka abonelik dahilinde abone hakkında emin değilim: doğru (yani O zamandan beri var bir soru olduğunu uzun zaman önce ve bana asla açık değildi)?
  • Hep (akış aynı Tipi ile ikiden fazla ardışık değerleri üretir bile) her bir listede en fazla iki unsurları var
+0

'Başka içinde abone olma konusunda emin değilim abonelik: doğru mu? <- Bununla ne demek istiyorsun? Bu nerede oluyor? (edit) Oh, anladım - "Yazdır" ın içinde ... evet, bunu yapmak istemiyorsun, sadece şu anda 'IDisposable' sızıyor olsaydın ' – JerKimball

+0

'Abone' bir 'eylem' alır bir aşırı yük var '- bunu kullanın; Bu yüzden benim örneğimde 'Console.WriteLine 'kullanıyorum. 'Yazdır' imzasını 'IList 'olarak değiştirirseniz, sadece' '(query.Subscribe (Yazdır))' ' – JerKimball

+0

' ı Seçebilirim IObservable >> seçeneğinden, bu yüzden kullanıyorum İlk Baskı (IObservable > işlemleri). Bu yüzden ikinci işlemleri tekrar tamamlayacağım. Yazarken – fra

cevap

1

en yeni bir yaklaşım (dolayısıyla yeni yanıt) deneyelim:

Öncelikle, bir anahtara dayalı öğelerin "çöküş" listeleri sipariş istinat olurken bir uzantısı yöntemi tanımlayalım:

public static class Ext 
{ 
    public static IEnumerable<List<T>> ToRuns<T, TKey>(
      this IEnumerable<T> source, 
      Func<T, TKey> keySelector) 
    { 
     using (var enumerator = source.GetEnumerator()) 
     { 
      if (!enumerator.MoveNext()) 
       yield break; 

      var currentSet = new List<T>(); 

      // inspect the first item 
      var lastKey = keySelector(enumerator.Current); 
      currentSet.Add(enumerator.Current); 

      while (enumerator.MoveNext()) 
      { 
       var newKey = keySelector(enumerator.Current); 
       if (!Equals(newKey, lastKey)) 
       { 
        // A difference == new run; return what we've got thus far 
        yield return currentSet; 
        lastKey = newKey; 
        currentSet = new List<T>(); 
       } 
       currentSet.Add(enumerator.Current); 
      } 

      // Return the last run. 
      yield return currentSet; 

      // and clean up 
      currentSet = new List<T>(); 
      lastKey = default(TKey); 
     } 
    } 
} 

Oldukça basit - bir IEnumerable<T> verilir, her alt liste aynı anahtara sahip olacak şekilde bir List<List<T>> geri verecektir.

Şimdi

, onu beslemek ve kullanımı:

var rnd = new Random(); 
var fakeSource = new Subject<Operation>(); 
var producer = Observable 
    .Interval(TimeSpan.FromMilliseconds(1000)) 
    .Subscribe(i => 
     { 
      var op = new Operation(); 
      op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
      fakeSource.OnNext(op); 
     });  

var singleSource = fakeSource 
    .Publish().RefCount(); 

var query = singleSource 
    // change this value to alter your "look at" time window 
    .Buffer(TimeSpan.FromSeconds(5))  
    .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0)); 

using(query.Subscribe(batch => 
{ 
    foreach(var item in batch) 
    { 
     Console.WriteLine("{0}({1})", item.First().Type, item.Count); 
    } 
})) 
{ 
    Console.ReadLine(); 
    producer.Dispose();  
} 

bir koşuşturma ver - burada ben tipik vadede bakın ne:

insert(4) 
delete(2) 
insert(1) 
delete(1) 
insert(1) 
insert(1) 
delete(1) 
insert(1) 
delete(2) 
delete(2) 
insert(2) 
delete(1) 
insert(1) 
delete(2) 
insert(2) 
+0

Çok teşekkür ederim! Tam olarak ihtiyacım olan şey bu. Sana çok hak edilen lütuf vereceğim! – fra

4

Galiba sen GroupByUntil bir karışımı sayesinde tüm aradıklarınızı alabilirsiniz , DistinctUntilChanged ve Buffer:

Bu örnek kod uyacak şekilde verdiği biraz gerektirir, ancak sorgu (ve kavram) tutun olmalıdır:

(düzenleme: doh - orada biraz cevapsız ...)

void Main() 
{ 
    var rnd = new Random(); 
    var fakeSource = new Subject<Operation>(); 
    var producer = Observable 
     .Interval(TimeSpan.FromMilliseconds(1000)) 
     .Subscribe(i => 
      { 
       var op = new Operation(); 
       op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
       fakeSource.OnNext(op); 
      });  
    var singleSource = fakeSource.Publish().RefCount(); 

    var query = singleSource 
     // We want to groupby until we see a change in the source 
     .GroupByUntil(
       i => i.Type, 
       grp => singleSource.DistinctUntilChanged(op => op.Type)) 
     // then buffer up those observed events in the groupby window 
     .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50)); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     Console.ReadLine(); 
     producer.Dispose();   
    } 
} 

public class Operation { 
    private static int _cnt = 0; 
    public Operation() { Seq = _cnt++; } 
    public int Seq {get; set;} 
    public string Type {get; set;}  
} 
+0

Merhaba! Teşekkürler, ama beklenen çıktıyı vermiyor. Aslında GroupBy'nin ihtiyaçlarım için uygun olmadığını düşünüyorum, çünkü iki akış arasında geçiş yapmak için daha fazlasına ihtiyacım var. Çözümünüzün çıktısını doğrulayabilir misiniz? Teşekkürler – fra

+0

@fra Hah - bir bölümü kaçırdı ... * iç çek * Düzenlerim ... tamam, güncellenmiş sürümü dene? – JerKimball

+0

çok teşekkür ederim, neredeyse oradayız. Hala bir kaç şüphe ve sorun var, lütfen güncellenmiş sorumu kontrol edin – fra