2017-04-16 26 views
5

Aşağıdaki kodu var:RxJava: gözlemlenebilir ve varsayılan iplik

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1 

Önemli detay: Ben başka bir iş parçacığı subscribe yöntemini arıyorum (main iplik burada

Observable.create(new ObservableOnSubscribe<String>() { 
      @Override 
      public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { 
       Thread thread = new Thread(new Runnable() { 
        @Override 
        public void run() { 
         s.onNext("1"); 
         s.onComplete(); 
        } 
       }); 
       thread.setName("background-thread-1"); 
       thread.start(); 
      } 
     }).map(new Function<String, String>() { 
      @Override 
      public String apply(@NonNull String s) throws Exception { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("map: thread=" + threadName); 
       return "map-" + s; 
      } 
     }).subscribe(new Observer<String>() { 
      @Override 
      public void onSubscribe(Disposable d) {} 

      @Override 
      public void onNext(String s) { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onNext: thread=" + threadName + ", value=" + s); 
      } 

      @Override 
      public void onError(Throwable e) {} 

      @Override 
      public void onComplete() { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onComplete: thread=" + threadName); 
      } 
     }); 

Ve çıkış var Android'de). Observable sınıf senkron ve varsayılan olarak ve doğru, (s.onNext) olayları yayar aynı iş parçacığı üzerinde her şey (map gibi operatörler + bildiren abonelerine) gerçekleştirir gibi

Yani görünüyor? Acaba ... amaçlanan davranış mı yoksa sadece bir şeyi yanlış anlamış mıyım? Aslında, en az onNext ve onComplete geri aramalarının, bir yayıcı olayda değil, arayanın iş parçacığı üzerinde çağrılacağını beklerdim. Bu özel durumda gerçek arayanın iş parçasının önemli olmadığını doğru anlıyor muyum? En azından, zaman uyumsuz olarak olaylar oluşturulduğunda.

Başka bir endişe - eğer bazı harici kaynaklardan bir parametre olarak Gözlemlenebilir (yani kendi başıma üretemiyorum) alsam ne olur? ... benim için kullanıcı olup olmadığını kontrol etmenin bir yolu yoktur. Senkronize veya senkronize olmayan ve sadece subscribeOn ve observeOn yöntemleri ile geri arama almak istediğimi açıkça belirtmem gerekiyor, değil mi?

Teşekkürler!

cevap

4

RxJava eşzamanlılık hakkında henüz tartışılmamış. Eğer observeOn/subscribeOn gibi başka herhangi bir mekanizma kullanmıyorsanız, abone ipinde değerler üretecektir. Lütfen operatörlerdeki Thread gibi düşük seviyeli yapıları kullanmayın, sözleşmeyi bozabilirsiniz.

İplik kullanımı nedeniyle, onNext, çağıran Konudan ('background-thread-1') çağrılacaktır. Abonelik, çağrıda (UI-Thread) gerçekleşir. Zincirdeki her operatör 'background-thread-1'-calling-Thread'den çağrılır. On the subscription ayrıca 'background-thread-1' den çağrılacaktır.

Çağıran iş parçacığı üzerinde değer üretmek istiyorsanız, aşağıdakileri kullanın: subscribeOn. İpliği ana kullanım yerine geri döndürmek istiyorsanız zincirin bir yerinde observeOn. Büyük olasılıkla abone olmadan önce.

Örnek: Aşağıda

Observable.just(1,2,3) // creation of observable happens on Computational-Threads 
      .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite 
      .map(integer -> integer) // map happens on Computational-Threads 
      .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread 
      .subscribe(integer -> { 
       // called from mainThread 
      }); 

iyi explanitation olduğunu. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html