2015-08-24 19 views
8

Sadece NoFlo.js'den ilham aldıktan sonra highland.js'yi öğreniyorum. Akımların tekrarlı olarak çalışabilmesini istiyorum. Bu karma örnekte, iki ile çarpılan bir sayı elde edeceğiz ve sonuçları < = 512'ye göre filtreleyeceğiz. Sayının çarpılmasından sonra sisteme geri besleniyor. Çalıştığım kod, ancak boru hattındaki doto işlevini çıkarırsam herhangi bir sayı işlemez. Verilerimi returnPipe ürününe yanlış gönderdiğimden şüpheleniyorum. Verileri bir sisteme geri aktarmanın daha iyi bir yolu var mı? Neyi kaçırıyorum? re-emitting the source stream iken bir dere kapalı spin doto: belgelerineHighlandjs'deki dairesel veri akışı

### 
    input>--m--->multiplyBy2>---+ 
      |     | 
      |     | 
      +---<returnPipe<----+ 
### 

H = require('highland') 

input = H([1]) 
returnPipe = H.pipeline(
    H.doto((v)->console.log(v)) 
) 
H.merge([input,returnPipe]) 
.map((v)-> return v * 2) 
.filter((v)-> return v <= 512) 
.pipe(returnPipe) 
+0

Sayıların hiçbirini işlemediğini nereden biliyorsunuz? – RadleyMith

+0

Doto çağrısı olduğu sürece çalışır. Sadece verileri, doto gibi gereksiz bir işleve bırakmak zorunda kalmadan borulamak istedim. –

cevap

5

. Bu, boru hattı söz konusu olduğunda, akıntının içinden geçmekte olduğu bir işlev olduğu anlamına gelir. doto'u çektiğinizde, orijinal akış, bir sonraki yinelemede dönüş akışından geri dönüş yapmaz.

Boru hattını kullanacaksanız, akışını alan ve akış yayınlayan bir yöntem iletmeniz gerekir. Örneğin, H.pipeline çağrısında H.map((v)=>{console.log(v); return v;}) gibi bir şeyle doto yöntemi yerini alabilecek ve bu yöntem bir akış tüketir ve bir dere yayar beri, bu .pipe(returnPipe)

üzerinde akışı içine geri geçirildiğinde akmaya devam edecek

DÜZENLEME: Sorunuza cevap vermek için, let input = H([1]) bildirdiğinizde, aslında burada bir akış oluşturuyorsunuz. Sen boru hattı ve returnPipe herhangi bir başvuru kaldırmak ve aşağıdaki kodla aynı çıktıyı üretebilir:

let input = H([1]); 

input.map((v)=> { 
    return v * 2; 
}) 
.filter((v)=> { 
    if (v <= 512) { 
    console.log(v); 
    } 
    return v <= 512; 
}) 
.pipe(input); 
+0

Bu mantıklı. Teşekkür ederim. Sadece sezgisel olarak, verileri herhangi bir işlev olmadan borulayabilmem gerektiği gibi geliyor. Tüketecek ve yeniden yayacak olan bir boru hattı gibi bir şey var mı? –

+0

Yaptığım düzenlemedeki örneğe bakın. Bir akımın fikri, tükettiği ve yeniden yaydığı ve “H (source)” aslında bir akış nesnesi yarattığından beri, ayrı bir “boru hattı” oluşturmaya gerek yoktur. Boru hattı fikrinin akışın içinden geçmesini istediğiniz bir dizi işlev için uygun bir sarmalayıcı sağlamak olduğuna inanıyorum. – jaredkwright

+0

Bu asla bana hiç olmazdı. Çok teşekkürler. +50 değerinde! –

1

Benim orijinal niyet highland.js bir özyinelemeli dosya okuyucu yazmaktı. Highland.js github sorunları listesine I posted ve Victor Vu bunu fantastik bir yazmayla bir araya getirmeme yardımcı oldu.

H = require('highland') 
fs = require('fs') 
fsPath = require('path') 

### 
    directory >---m----------> dirFilesStream >-------------f----> out 
       |           | 
       |           | 
       +-------------< returnPipe <--------------+ 

    legend: (m)erge (f)ork 

+ directory   has the initial file 
+ dirListStream  does a directory listing 
+ out    prints out the full path of the file 
+ directoryFilter runs stat and filters on directories 
+ returnPipe  the only way i can 

### 

directory = H(['someDirectory']) 
mergePoint = H() 
dirFilesStream = mergePoint.merge().flatMap((parentPath) -> 
    H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) -> 
    fsPath.join parentPath, path 
) 
out = dirFilesStream 
# Create the return pipe without using pipe! 
returnPipe = dirFilesStream.observe().flatFilter((path) -> 
    H.wrapCallback(fs.stat)(path).map (v) -> 
    v.isDirectory() 
) 
# Connect up the merge point now that we have all of our streams. 
mergePoint.write directory 
mergePoint.write returnPipe 
mergePoint.end() 
# Release backpressure. 
out.each H.log