Bu notun uzunluğu için şimdiden özür dilerim. Daha kısa sürede yapmak için çok zaman harcadım ve bu onu alabildiğim kadar küçüktü.rxjava ve clojure asynchrony gizemi: gelecek vaatler ve aracılar, benim
Bir gizem var ve yardımlarınız için minnettar olacağım. Bu gizem, çevrimiçi örneklerden bir çift basit observable
s üzerinden Clojure'de yazdığım bir rxjava observer
1 davranışından geliyor.
Gözlemlenebilir biri, gözlemcilerinin onNext
işleyicisine eşzamanlı olarak ileti gönderir ve ilkeli gözlemcim beklenildiği gibi davranır.
Diğer gözlenebilir eşzamansız bir Clojure future
aracılığıyla başka bir iş parçacığında aynı şeyi yapar. Tam olarak aynı gözlemci, onNext
numaralı telefona gönderilen tüm olayları yakalamaz; Sadece kuyrukta rastgele bir sayıda mesajı kaybetmek gibi görünüyor.
promise
d onCompleted
için bekleme sona ermesi ve bir agent
kollektör gönderilen tüm etkinlikler için bir bekleme sona ermesi arasındaki aşağıdaki kasıtlı ırk yoktur. promise
kazanırsa, onCompleted
için false
ve agent
içinde büyük olasılıkla kısa bir kuyruk görmeyi beklerim. agent
kazanırsa, onCompleted
için true
ve agent
kuyruğundaki tüm iletileri görmeyi beklerim. Beklemediğim bir sonuç onCompleted
için true
VE agent
'dan kısa bir kuyruk. Ama Murphy uyumuyor, ben de aynısını gördüm. Çöp toplamanın hatalı mı, yoksa Clojure'ın STM'sine mi, yoksa aptallığa mı, yoksa başka bir şeye mi?
Kaynağı, kendi bağımsız formunun sırasına göre burada sunarım, böylece lein repl
aracılığıyla doğrudan çalıştırılabilir. Netflix rxjava ait 0.9.0
sürümüne bağımlılığı beyan öncelikle Leiningen proje dosyası, project.clj
,: Şimdi
(defproject expt2 "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[com.netflix.rxjava/rxjava-clojure "0.9.0"]]
:main expt2.core)
, ad ve Clojure gereksinimi ve Java ithalatı üç yoldan çekil için cermonials vardır :
(ns expt2.core
(:require clojure.pprint)
(:refer-clojure :exclude [distinct])
(:import [rx Observable subscriptions.Subscriptions]))
Son olarak, konsola çıkış için bir makro:
(defmacro pdump [x]
`(let [x# ~x]
(do (println "----------------")
(clojure.pprint/pprint '~x)
(println "~~>")
(clojure.pprint/pprint x#)
(println "----------------")
x#)))
Son olarak, benim gözlemci. Gözlemlenebilir herhangi bir onNext
tarafından gönderilen iletileri toplamak için bir agent
kullanın. Potansiyel onError
toplamak için atom
kullanıyorum. onCompleted
için promise
kullanın, böylece gözlemcinin dışındaki tüketiciler bekleyebilir.
(defn- subscribe-collectors [obl]
(let [;; Keep a sequence of all values sent:
onNextCollector (agent [])
;; Only need one value if the observable errors out:
onErrorCollector (atom nil)
;; Use a promise for 'completed' so we can wait for it on
;; another thread:
onCompletedCollector (promise)]
(letfn [;; When observable sends a value, relay it to our agent"
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
;; If observable errors out, just set our exception;
(collect-error [excp] (reset! onErrorCollector excp))
;; When observable completes, deliver on the promise:
(collect-completed [ ] (deliver onCompletedCollector true))
;; In all cases, report out the back end with this:
(report-collectors [ ]
(pdump
;; Wait for everything that has been sent to the agent
;; to drain (presumably internal message queues):
{:onNext (do (await-for 1000 onNextCollector)
;; Then produce the results:
@onNextCollector)
;; If we ever saw an error, here it is:
:onError @onErrorCollector
;; Wait at most 1 second for the promise to complete;
;; if it does not complete, then produce 'false'.
;; I expect if this times out before the agent
;; times out to see an 'onCompleted' of 'false'.
:onCompleted (deref onCompletedCollector 1000 false)
}))]
;; Recognize that the observable 'obl' may run on another thread:
(-> obl
(.subscribe collect-next collect-error collect-completed))
;; Therefore, produce results that wait, with timeouts, on both
;; the completion event and on the draining of the (presumed)
;; message queue to the agent.
(report-collectors))))
Şimdi, işte bir senkron gözlemlenebilir. Gözlemcilerinin
onNext
boğazından 25 mesaj atıyor, ardından
onCompleted
s numaralarını çağırıyor.
(defn- customObservableBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method.
;; Send 25 strings to the observer's onNext:
(doseq [x (range 25)]
(-> observer (.onNext (str "SynchedValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted)
; return a NoOpSubsription since this blocks and thus
; can't be unsubscribed (disposed):
(Subscriptions/empty))))
Biz bu gözlemlenebilir bizim gözlemci abone: Bu beklenen işleri olarak
;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
(subscribe-collectors))
ve biz konsolda İşte
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["SynchedValue_0"
"SynchedValue_1"
"SynchedValue_2"
"SynchedValue_3"
"SynchedValue_4"
"SynchedValue_5"
"SynchedValue_6"
"SynchedValue_7"
"SynchedValue_8"
"SynchedValue_9"
"SynchedValue_10"
"SynchedValue_11"
"SynchedValue_12"
"SynchedValue_13"
"SynchedValue_14"
"SynchedValue_15"
"SynchedValue_16"
"SynchedValue_17"
"SynchedValue_18"
"SynchedValue_19"
"SynchedValue_20"
"SynchedValue_21"
"SynchedValue_22"
"SynchedValue_23"
"SynchedValue_24"],
:onError nil,
:onCompleted true}
----------------
aşağıdaki sonuçları görmek yapar bir zaman uyumsuz gözlemlenebilir tam olarak aynı şey, sadece bir future
's iş parçacığı:
(defn- customObservableNonBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method
(let [f (future
;; On another thread, send 25 strings:
(doseq [x (range 25)]
(-> observer (.onNext (str "AsynchValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted))]
; Return a disposable (unsubscribe) that cancels the future:
(Subscriptions/create #(future-cancel f))))))
;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
(subscribe-collectors))
Ancak, sürpriz, konsolda gördüğümüz şu: onCompleted
için true
, promise
DID NOT TIME-OUT; ancak asynch mesajlarından sadece bazıları. Gördüğümüz mesajların gerçek sayısı, koşula göre değişiyor ve oyunda bazı eşzamanlılık fenomeninin varlığına işaret ediyor. İpuçları takdir edildi.
----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["AsynchValue_0"
"AsynchValue_1"
"AsynchValue_2"
"AsynchValue_3"
"AsynchValue_4"
"AsynchValue_5"
"AsynchValue_6"],
:onError nil,
:onCompleted true}
----------------
Onaylandı ve test edildi. –