rx ve belirli bir sorgu ile sıkışmış durumdayım. sorun:Koşul değiştirilinceye kadar Rx groupby
Birçok tek güncelleştirme işlemleri sürekli akım üretilir. Operasyonlar eklenebilir veya silinebilir. Bu akışları tamponlamak ve o sırada birkaç işlem yapmak istiyorum, ancak siparişi korumak gerçekten çok önemli. Ayrıca, operasyonlar her X saniye
tamponlu ve diziler yapılmalıdır
Örnek:
insert(3)-delete(2)-insert(1)-delete(4)
Ben test etmek için basit bir uygulama yazdım:
yılında:
insert-insert-insert-delete-delete-insert-delete-delete-delete-delete
Out o ve ben az ya da daha az çalışır ama gelen ekleme/silme sırasına uymuyor
namespace RxTests
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
internal class Program
{
private static readonly Random Random = new Random();
private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();
private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();
private static void Main(string[] args)
{
Console.WriteLine("Starting production");
var producerScheduler = new EventLoopScheduler();
var consumerScheduler = new EventLoopScheduler();
var producer =
Observable.Interval(TimeSpan.FromSeconds(2))
.SubscribeOn(producerScheduler)
.Subscribe(Produce, WriteProductionCompleted);
var consumer =
operations.ObserveOn(producerScheduler)
.GroupBy(operation => operation.Delete)
.SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50))
.SubscribeOn(consumerScheduler)
.Subscribe(WriteUpdateOperations);
Console.WriteLine("Type any key to stop");
Console.ReadKey();
consumer.Dispose();
producer.Dispose();
}
private static void Produce(long time)
{
var delete = Random.NextDouble() < 0.5;
Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time);
var idString = (time + 1).ToString(CultureInfo.InvariantCulture);
var id = time + 1;
operations.OnNext(
new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture)));
}
private static void WriteProductionCompleted()
{
Console.WriteLine("Production completed");
ProducerStopped.Cancel();
}
private static void WriteUpdateOperation(UpdateOperation updateOperation)
{
Console.WriteLine("Consuming {0}", updateOperation);
}
private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation)
{
foreach (var operation in updateOperation)
{
WriteUpdateOperation(operation);
}
}
private class UpdateOperation
{
public UpdateOperation(long id, bool delete, params string[] changes)
{
this.Id = id;
this.Delete = delete;
this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());
}
public bool Delete { get; set; }
public long Id { get; private set; }
public IList<string> Changes { get; private set; }
public override string ToString()
{
var stringBuilder = new StringBuilder("{UpdateOperation ");
stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete);
if (this.Changes.Count > 0)
{
stringBuilder.Append(this.Changes.First());
foreach (var change in this.Changes.Skip(1))
{
stringBuilder.AppendFormat(", {0}", change);
}
}
stringBuilder.Append("]}");
return stringBuilder.ToString();
}
}
}
}
herkes doğru sorguyu bana yardımcı olabilir? Kullanılması aşağıdaki
using(query.Subscribe(Print))
{
Console.ReadLine();
producer.Dispose();
}
: Aşağıdaki satırlar küçük değişiklikler/sonuçlarını yazdırmak için JerKimball koduyla eklemeler vardır
Teşekkür
GÜNCELLEME 08.03.13 (JerKimball tarafından Öneriler) Yazdırma yöntemleri:
private static void Print(IObservable<IList<Operation>> operations)
{
operations.Subscribe(Print);
}
private static void Print(IList<Operation> operations)
{
var stringBuilder = new StringBuilder("[");
if (operations.Count > 0)
{
stringBuilder.Append(operations.First());
foreach (var item in operations.Skip(1))
{
stringBuilder.AppendFormat(", {0}", item);
}
}
stringBuilder.Append("]");
Console.WriteLine(stringBuilder);
}
ve Çalıştırma için dizeye şu:
public override string ToString()
{
return string.Format("{0}:{1}", this.Type, this.Seq);
}
Sipariş korunur ancak:
- Başka abonelik dahilinde abone hakkında emin değilim: doğru (yani O zamandan beri var bir soru olduğunu uzun zaman önce ve bana asla açık değildi)?
- Hep (akış aynı Tipi ile ikiden fazla ardışık değerleri üretir bile) her bir listede en fazla iki unsurları var
'Başka içinde abone olma konusunda emin değilim abonelik: doğru mu? <- Bununla ne demek istiyorsun? Bu nerede oluyor? (edit) Oh, anladım - "Yazdır" ın içinde ... evet, bunu yapmak istemiyorsun, sadece şu anda 'IDisposable' sızıyor olsaydın ' – JerKimball
'Abone' bir 'eylem' alır bir aşırı yük var '- bunu kullanın; Bu yüzden benim örneğimde 'Console.WriteLine 'kullanıyorum. 'Yazdır' imzasını 'IList 'olarak değiştirirseniz, sadece' '(query.Subscribe (Yazdır))' ' –
JerKimball
' ı Seçebilirim IObservable >> seçeneğinden, bu yüzden kullanıyorum İlk Baskı (IObservable > işlemleri). Bu yüzden ikinci işlemleri tekrar tamamlayacağım. Yazarken –
fra