2013-05-08 25 views
13
  • 2 akışları akışları?Concatenate iki (veya n)

    stream1.pipe(outStream); stream2.pipe(outStream) yapamıyorum, çünkü akış içeriği bir araya getirildi.

  • N akışları:

    akışlarının belirsiz bir sayıda, örneğin yayan bir EventEmitter Verilen bir deyimsel (özlü) yolu beraber birleştirilmiş tüm akışları ile bir dere olsun ne

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    ?

cevap

11

combined-stream paketi, akışları birleştirir. README Örnek:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

Ben tek seferde tüm akışları eklemek zorunda inanıyoruz. Sıra boşsa, combinedStream otomatik olarak sona erer. Bakınız issue #5.

stream-stream kütüphane açık .end sahip alternatif, ama çok daha az popüler ve tahminen de test edilmiş değil. Düğüm 0.10 düğümünün2 API'sini kullanır (bkz. this discussion).

3

Bunu daha özlü yapabilmek, ancak burada çalışan biri olabilir:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

Biz streams (akışlarının kuyruk ile devlet izlemek; arkasına push ve shift dan öndeki), isStreaming ve streamsEnded. Yeni bir akış aldığımızda, onu itiyoruz ve bir akış bittiğinde, dinlemeyi ve değiştirmeyi bırakıyoruz. Akarsu akışı sona erdiğinde, streamsEnded'u ayarladık.

Bu olayların her birinde, içinde bulunduğumuz durumu kontrol ederiz. Eğer zaten akış yapıyorsak (bir akış oluşturuyorsak), hiçbir şey yapmayız. Sıra boşsa ve streamsEnded ayarlanmışsa, end olayını yayarız. Kuyrukta bir şey varsa, onu devam ettirir ve olaylarını dinleriz.

* pause ve resume danışma, bu nedenle bazı akışları doğru çalışmayabilir ve belleğe alma gerektirecektir unutmayın. Bu alıştırma okura bırakılmıştır.

tüm bu yapmış olan, onunla bir ConcatStream oluşturarak, bir EventEmitter inşa ve bir end olay ardından iki stream olayları yayan tarafından n=2 davayı yapardı. Eminim daha mantıklı bir şekilde yapılabilir, ama sahip olduğumuz şeyi de kullanabiliriz.

+0

Teşekkür Aaron! Biraz var olan kütüphane olabileceğine inanıyorum, böylece üç çizgide çözebilirim. Eğer yoksa, çözümünüzü bir pakete çıkarabileceğimi düşünüyorum. Kodunuzu bir MIT lisansı kapsamında kullanabilir miyim? –

+0

Ah, akış akışı kitaplığını buldu. Cevabımı gör. –

+0

@JoLiss Ayrıca önce bir şey aradım, ancak bu seçeneği bulamadım. Eğer hala istiyorsanız, kodumu bir kütüphanede kullanabilirsiniz. –

1

streamee.js, düğüm 1'e dayalı bir dizi akım trafosu ve bestecidir.0+ akışları ve concatenate yöntemi içerir:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

Teşekkürler, ben kontrol edeceğim. Bu Node 0.10 var mı? –

+0

Evet Düğüm 0.10, ancak eski tarz akışları README – atamborrino

2

https://github.com/joepie91/node-combined-stream2 (., Yukarıda tarif edilmiştir) kombine akım modülü için bir damla-Streams2 uyumlu değiştirme otomatik Streams1 sarar akışları olduğu. Kombine-stream2 için

örnek kod:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

bu vanilya ile yapılabilir

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

Basit reduce operasyon nodejs ince olmalıdır nodejs!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

Alkış;)

+0

'da yazıldığı gibi 0.10+ akışa sarabilirsiniz. Bu 'katıldı' gibi görünüyor undefined olacaktır. –

+1

sabit. teşekkürler;) @Mark_M – Ivo

+0

UYARI: Bu, tüm akışların, verilerin sıralanmasına neden olmaktan çok, verilerin sıralanmasına ilişkin herhangi bir bakımdan, PassThrough akışına paralel olarak bağlanmasına neden olur. Bu yaklaşımın amacı olan –

İlgili konular