Rx

2014-07-16 30 views
8
'da Hot Concat

Observable.Concat, gözlenebilirleri birleştiren bir uygulamadır, ancak ikinci IObservable<T> yalnızca ilk tamamlandığında aboneliği yapar. Rx

http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat

bir "HotConcat" herhangi uygulaması var mı? Observable.Merge numarasına benzer, ancak ilk önce abonelik öğelerinin öğelerini ve ardından alt öğeleri içeren yayınlama sırasını koruyarak. Bir şey gibi: Hot Concat

Bunu ReplaySubject<T> kullanmak mümkündür biliyorum ama bunun nedeni performans ve bellek kullanımı etkilerin çok iyi görünüyor değil ..

cevap

6

Bir süredir kullanıyorum uygulamam. Bu uygulama, 'u BufferUntilSubscribed operatörünü, Connect numaralı telefonu aradığınızda arabelleğe almaya başlayacak ve arabelleğe alınan sonuçları ilk aboneye dağıtacak olan IConnectableObservable'a dönüştürür. İlk abonenin "yakalandığı" olduğunda, tamponlama duracak ve aboneye geldikleri anda canlı etkinlikler verilecektir. İşte BufferUntilSubscribed ait implemementation var

public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources) 
{ 
    var s2 = sources.Select(s => s.BufferUntilSubscribed()); 
    var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray()); 
    return Observable.Create<T>(observer => 
    { 
     var s = new SingleAssignmentDisposable(); 
     var d = new CompositeDisposable(subscriptions); 
     d.Add(s); 

     s.Disposable = s2.Concat().Subscribe(observer); 

     return d; 
    }); 
} 

: Bunu sahip olduktan

, gibi bir şey olarak HotConcat yazabilir

private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T> 
{ 
    private readonly IObservable<T> _source; 
    private readonly IScheduler _scheduler; 
    private readonly Subject<T> _liveEvents; 
    private bool _observationsStarted; 
    private Queue<T> _buffer; 
    private readonly object _gate; 

    public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler) 
    { 
     _source = source; 
     _scheduler = scheduler; 
     _liveEvents = new Subject<T>(); 
     _buffer = new Queue<T>(); 
     _gate = new object(); 
     _observationsStarted = false; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     lock (_gate) 
     { 
      if (_observationsStarted) 
      { 
       return _liveEvents.Subscribe(observer); 
      } 

      _observationsStarted = true; 

      var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening. 
      return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer); 
     } 
    } 

    public IDisposable Connect() 
    { 
     return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted); 
    } 

    private void RemoveBuffer() 
    { 
     lock (_gate) 
     { 
      _buffer = null; 
     } 
    } 

    /// <summary> 
    /// Acquires a lock and checks the buffer. If it is empty, then replaces it with null and returns null. Else replaces it with an empty buffer and returns the old buffer. 
    /// </summary> 
    /// <returns></returns> 
    private Queue<T> GetAndReplaceBuffer() 
    { 
     lock (_gate) 
     { 
      if (_buffer == null) 
      { 
       return null; 
      } 

      if (_buffer.Count == 0) 
      { 
       _buffer = null; 
       return null; 
      } 

      var result = _buffer; 
      _buffer = new Queue<T>(); 
      return result; 
     } 
    } 

    /// <summary> 
    /// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data. 
    /// </summary> 
    /// <returns></returns> 
    private IEnumerable<IObservable<T>> GetBuffers() 
    { 
     Queue<T> buffer; 
     while ((buffer = GetAndReplaceBuffer()) != null) 
     { 
      yield return buffer.ToObservable(_scheduler); 
     } 
    } 

    private void OnNext(T item) 
    { 
     lock (_gate) 
     { 
      if (_buffer != null) 
      { 
       _buffer.Enqueue(item); 
       return; 
      } 
     } 

     _liveEvents.OnNext(item); 
    } 
} 

/// <summary> 
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data. 
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events. 
/// </summary> 
/// <param name="source"></param> 
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param> 
/// <returns></returns> 
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler) 
{ 
    return new BufferUntilSubscribedObservable<T>(source, scheduler); 
} 

/// <summary> 
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data. 
/// Thus the observer may subscribe late to a hot observable yet still see all of the data. Later observers will not see the buffered events. 
/// </summary> 
/// <param name="source"></param> 
/// <returns></returns> 
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source) 
{ 
    return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate); 
} 
+0

Güzel uygulanması, @Brandon .. eşzamanlılık sorunları hakkında güvenli görünüyor , bir soru, neden 'Gözetlemek()' 'Observable.Create ' önce ve onun içinde değil IObservables kullanıyorsunuz? –

+0

Gözlemlenebilirinizin çok sıcak olduğu ve abone olmanızdan önce bunları tamponlamak istediğiniz teorisi üzerine. Buna ihtiyacınız yoksa Oluşturun'un içine taşıyın. – Brandon

+0

Evet, elbette, ama yönergeleri takip etmeliyiz .. Bunun hakkında komik bir şey, 'Observable.Concat' hakkında okuduğum mermer diyagramların çoğu, işlemle ilgili yanlış fikre (veya görüntüye) sahip (eğer sıcak gözlenebilirse) kullanılmış). –

1

Böyle bir kompozisyonun bilmiyorum fonksiyon, ama ihtiyaçlarınızı karşılayan bir yazabilirsiniz.

İşte yazma girişimim bir. Öğeleri, yalnızca bir kez oynatılana kadar bellekte tutacaktır. Ama bence daha temiz bir uygulama yapmanın bir yolu olmalı.

public static IObservable<T> HotConcat<T>(this IObservable<T> first, IObservable<T> second) 
{ 
    return Observable.Create<T>(observer => 
    { 
     var queue = new Queue<Notification<T>>(); 

     var secondSubscription = second.Materialize().Subscribe(item => 
     { 
      if (queue == null) 
       return; 

      lock (queue) 
      { 
       queue.Enqueue(item); 
      } 
     }); 

     var secondReplay = Observable.Create<T>(secondObserver => 
     { 
      while (true) 
      { 
       Notification<T> item = null; 

       lock (queue) 
       { 
        if (queue.Count > 0) 
        { 
         item = queue.Dequeue(); 
        } 
        else 
        { 
         secondObserver.OnCompleted(); 
         secondSubscription.Dispose(); 
         queue = null; 
         break; 
        } 
       } 

       if (item != null) 
        item.Accept(secondObserver); 
      } 

      return secondSubscription; 
     }); 

     return first.Concat(secondReplay).Concat(second).Subscribe(observer); 
    }); 
} 
İlgili konular