8

Rx.NET için biraz yeniyim. Abonelerin herhangi biri tarafından atılabilecek bir istisna yakalamak mümkün mü? Aşağıdakileri yapın ...Bir Abonelik OnNext Eyleminden atılabilen istisnaları yakalamak

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

Şu anda aşağıdakilerin bir örneğiyle abonelik bazında yetişiyorum. Uygulama sadece bir Bekleme iş parçacığı uyanmak için bir ManualResetEvent kullanır. Bazı iyi bir yolu olmalı gibi

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

ve böylece gibi kullanarak ...

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

hissediyorum. Rx.NET'teki tüm hata işleme yetenekleri özellikle hata gözlenebilirleriyle uğraşırken mi?

DÜZENLEME: Her isteğim, benim uygulamam https://gist.github.com/1409829 (arabirim ve uygulama prod kodunda farklı düzeneklere ayrılmıştır). Geribildirim açığız. Bu aptalca görünebilir, ancak birçok farklı Rx abonesini yönetmek için kale windsor kullanıyorum.

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

cevap

8

yerleşik Gözlenebilen operatörler senin ne yapmayın: Bu istisna yakalayıcı observable IObservable örneği olduğunu Daha sonra böyle kullanılacak bu

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

gibi kap ile kayıtlı varsayılan olarak (olaylar gibi) sormak, ancak bunu yapacak bir uzantı yöntemi yapabilirsiniz.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

Sonra, gözlemlediğiniz herhangi bir davranışı almak için bu yönteme göre gözlemlenebilir.

+0

Teşekkürler, sorumu yanıtladınız, ancak OnNext'i deneme/yakalama özel durumunuz yakalayacağından emin misiniz? Biri, abone olunan kodun başka bir iş parçacığı üzerinde çalıştırılmasına neden olacak şekilde iade edilen IObservable ile bir şeyler yapabilir. Ben aslında benim Subject.OnNext çağrı etrafında bir try/catch koymak için çalıştı ama istisnalar yakalanmadı. Ancak bir SubscribeWithExceptionHandling yöntemi veya bir şey oluşturabilirsiniz. – drstevens

+1

@drstevens Aynı iş parçacığının istisnalarını yakalar. Gözlemciniz, kendi istisnalarını atayan asenkron işlemlerini başlatıyorsa, bu onları yakalamayacaktır. –

+1

Rx işlemlerinin kaçının yeni bir iş parçacığı (ya da havuzdaki görev) ile sonuçlandığını düşünürsek, durumun böyle olması muhtemeldir. "Handler.FooStream" ve "Abone" arasında bir "GroupByUntil" (...) arasında. SelectMany (...). Tampon (bir zaman) ile. Örneğinizi takip eden bir ExceptionWithCatch öğesi yaratma işlemini tamamladım ve Exception'ı yakaladıktan sonra OnError işleyicisine aktarılan aynı eylemi kullanıyor. – drstevens