2012-11-19 9 views
9

Zaman tabanlı olayları raporlama amacıyla toplamak için kullanılan bir C# (.NET 4.5) uygulaması yazıyorum. Sorgulama mantığımı hem gerçek zamanlı hem de geçmiş veriler için yeniden kullanılabilir hale getirmek için Reaktif Uzantılar (2.0) ve bunların IScheduler altyapısını (HistoricalScheduler ve arkadaşları) kullanıyorum.Neden Observable.Generate() System.StackOverflowException?

Örneğin, olayların bir listesini oluşturmak varsayalım (kronolojik ama çakışabilir!) Kendilerine ait yük onların damgası ist ve sabit süreli tampon göre dağılımlarını bilmek istiyorum:

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

aşağıdaki yığın izleme ile System.StackOverflowException bu kod sonuçları Running (aşağı son 3 çizgileri tüm yol İt's):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

Tamam, sorun listeye bağlı olarak, Observable.Generate() benim kullanımı geliyor gibi görünüyor boyut ()) ve programlayıcı seçiminden bağımsız olarak.

Neyi yanlış yapıyorum? Veya daha genel olarak, kendi zaman damgalarını sağlayan bir IEnumerable olayından IObservable oluşturmak için tercih edilen yol ne olurdu?

+1

Bu hatayla karşılaşmadan önce num 'ne kadar büyük olabilir? Ayrıca, hata ayıklayıcısında bunu tek adımda atarsanız, hatayı görmeden önce yürüten son kod satırı nedir? –

+0

Bana göre, kritik eşik = "num = 51600" değerinde görünüyor (Sürüm yapılandırmasında, Debug yapılandırmasında biraz daha az). Gözlenebilir sekans tamamen yaratılmış gibi görünüyor. Observable.Generate() 'için lamdba ifadelerinde kesme noktalarına vurabilirim. Son durum, Console.WriteLine() 'son çağrısından sonra atılır. –

+1

Anlayın, bu sadece bir tahmindir, ancak akış her elemanın atmaya çalıştığından kuşkusuz gözüküyor ve her eleman akışı elden geçirmeye çalışıyor. Sonunda, 'İptal' veya 'Atma' için temel olarak özyinelemeli çağrılar ile sonuçlanırsınız, bu da yığınınızı (varsayılan boyutu 1 megabayttır) üfler. Bunun neden olduğunu söylemek için 'Gözlenebilir' ile yeterince tanıdık değilim. –

cevap

3

(güncelleme: at bakın:

(RX ile v1.0, orijinal Observable.Generate() aslında işe yarıyor gibi! Aman, ben daha önce o işaretli) yanıtın alt kısmı)

Sorun, Observable.Generate'un nasıl çalıştığıdır - bağımsız değişkenlere dayalı bir corecursive (içten içe dönüşü düşünün) düşünün; Bu argümanlar çok iç içe geçmiş yardımcı jeneratör üretiyorsa, yığınınızı üflersiniz.

Bu noktadan, 'dan çok şey iletiyorum (önümde Rx kaynağım yok) (aşağıya bakın), ama bahse girerim, sizin tanımınıza göre, :

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

Arama yığınının taşması için yeterince büyük olana kadar devam eder.Yani, bir yöntem imzası + int sayacınız, bu, özyinelemeli çağrı başına 8-16 bayt gibi bir şey (daha çok, devlet makine jeneratörü nasıl uygulandığına bağlı olarak), bu yüzden 60.000 ses yaklaşık sağ (1M/16 ~ 62500 maksimum) derinlik)

DÜZENLEME: kaynağını çekti - doğruladı: Bu gibi görünür üret ait "Çalıştır" yöntemi - Generate için iç içe geçmiş aramaların dikkat:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

EDIT: Derp, teklif vermedi herhangi bir alternatif ... işte işe yarayabilir:

(EDIT: sabit Enumerable.Range, böylece akış boyutuile çarpılmayacak)

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

Teşekkürler, bu harika! Ayrıca kendi çözümümden daha verimli görünüyor. Yine de, aritmetinizde küçük bir hatayı düzeltmek zorunda kaldım, ama (düzenlemeye bakın). RX'de niçin yinelemeli uygulamaya ihtiyaç duyulduğunu hala pek göremiyorum. Sonuçta, RX v1.0 (60.000 büyüklüğünün ötesinde) ile çalışmak gibi görünüyor. Yine de, güzel bir soruşturma, akıllı bir çözüm. Tekrar teşekkürler! –

+0

Sorun değil! Heh - Aslında etkilendim sadece * bir * matematik hatası vardı ...;) – JerKimball

3

Tamam, durum geçişleri olarak lamdba ifadeleri gerektirmeyen farklı bir fabrika yöntemi alıyorum ve artık herhangi bir yığın taşması görmüyorum.

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

Elle önce olayları zamanlama aboneliği dönen görünüyor: Bu benim soruya doğru cevap olarak nitelemek olsaydı henüz emin Özür, ancak çalışıyor ve ben razı olmama burada paylaşmak düşünce (!) bana garip, ama bu durumda lambda ifadesinin içinde yapılabilir.

Bu yaklaşım hakkında yanlış bir şey varsa, lütfen beni düzeltin. Ayrıca, orijinal kodumu ihlal ettiğim System.Reactive arasındaki örtük varsayımları duymaktan memnuniyet duyarım. Ama bir alternatif vermedi fark -