2013-06-01 40 views
8

Bu notun uzunluğu için şimdiden özür dilerim. Daha kısa sürede yapmak için çok zaman harcadım ve bu onu alabildiğim kadar küçüktü.rxjava ve clojure asynchrony gizemi: gelecek vaatler ve aracılar, benim

Bir gizem var ve yardımlarınız için minnettar olacağım. Bu gizem, çevrimiçi örneklerden bir çift basit observable s üzerinden Clojure'de yazdığım bir rxjava observer1 davranışından geliyor.

Gözlemlenebilir biri, gözlemcilerinin onNext işleyicisine eşzamanlı olarak ileti gönderir ve ilkeli gözlemcim beklenildiği gibi davranır.

Diğer gözlenebilir eşzamansız bir Clojure future aracılığıyla başka bir iş parçacığında aynı şeyi yapar. Tam olarak aynı gözlemci, onNext numaralı telefona gönderilen tüm olayları yakalamaz; Sadece kuyrukta rastgele bir sayıda mesajı kaybetmek gibi görünüyor.

promise d onCompleted için bekleme sona ermesi ve bir agent kollektör gönderilen tüm etkinlikler için bir bekleme sona ermesi arasındaki aşağıdaki kasıtlı ırk yoktur. promise kazanırsa, onCompleted için false ve agent içinde büyük olasılıkla kısa bir kuyruk görmeyi beklerim. agent kazanırsa, onCompleted için true ve agent kuyruğundaki tüm iletileri görmeyi beklerim. Beklemediğim bir sonuç onCompleted için true VE agent'dan kısa bir kuyruk. Ama Murphy uyumuyor, ben de aynısını gördüm. Çöp toplamanın hatalı mı, yoksa Clojure'ın STM'sine mi, yoksa aptallığa mı, yoksa başka bir şeye mi?

Kaynağı, kendi bağımsız formunun sırasına göre burada sunarım, böylece lein repl aracılığıyla doğrudan çalıştırılabilir. Netflix rxjava ait 0.9.0 sürümüne bağımlılığı beyan öncelikle Leiningen proje dosyası, project.clj,: Şimdi

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

, ad ve Clojure gereksinimi ve Java ithalatı üç yoldan çekil için cermonials vardır :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

Son olarak, konsola çıkış için bir makro:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

Son olarak, benim gözlemci. Gözlemlenebilir herhangi bir onNext tarafından gönderilen iletileri toplamak için bir agent kullanın. Potansiyel onError toplamak için atom kullanıyorum. onCompleted için promise kullanın, böylece gözlemcinin dışındaki tüketiciler bekleyebilir.

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

Şimdi, işte bir senkron gözlemlenebilir. Gözlemcilerinin onNext boğazından 25 mesaj atıyor, ardından onCompleted s numaralarını çağırıyor.

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

Biz bu gözlemlenebilir bizim gözlemci abone: Bu beklenen işleri olarak

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

ve biz konsolda İşte

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

aşağıdaki sonuçları görmek yapar bir zaman uyumsuz gözlemlenebilir tam olarak aynı şey, sadece bir future 's iş parçacığı:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

Ancak, sürpriz, konsolda gördüğümüz şu: onCompleted için true, promise DID NOT TIME-OUT; ancak asynch mesajlarından sadece bazıları. Gördüğümüz mesajların gerçek sayısı, koşula göre değişiyor ve oyunda bazı eşzamanlılık fenomeninin varlığına işaret ediyor. İpuçları takdir edildi.

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

cevap

7

ajan üzerinde await-forblok tüm eylemler böylece sevk kadar geçerli iş parçacığı anlamına kadar sizin beklemektedir sonra gerçekleşmesi anlamına gelir meydana gelmiş ajanlara, (bu konuya veya ajandan) orada hala ajana mesaj gönderebilecek başka bir iş parçacığı var ve durumunuzda olan şey. Ajandaki bekleyişiniz sona erdikten ve haritadaki :onNext anahtarındaki değerini deref ettikten sonra, beklemeden sonra doğru olan tamamlanmış sözün bekleyişini bekleyiniz, ancak bu süre zarfında diğer mesajlar gönderilir. vektöre toplanacak ajan.

Temelde o zaman maddesi üzerinde herhangi daha send çağrılar olarak sonra meydana gelebilecek orada coz ajanlar için bekleyin sonra tamamlanmasını bekleyin ve anlamı haritasında ilk anahtar olarak :onCompleted anahtarı alarak bu çözebilir Şimdiye kadar teslim edildi.

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

Onaylandı ve test edildi. –