2016-04-12 16 views
8

Yazılabilir verilerin henüz veri almaya hazır olmadığı Node.js'de okunabilir bir akışa okunabilir bir akışa bağlanmanın bir yolu var mı? Başka bir deyişle, okunabilir olanı yazılabilir ile bağlamak istiyorum, ancak yazım yöntemini tanımlamak da dahil olmak üzere yazılabilirliği, programın sonraki bir noktasında başlatmak istiyorum. Belki de yazma yöntemini uygulamalıyız, ancak okunabilir bir akışı duraklatmak için yazılabilir bir akışı duraklatmanın bir yolu var mı? Ya da veriyi yazılabilir hale getirmeden önce bir ara/dönüştürme akışı kullanabiliriz ve verileri burada tamponlayabiliriz!Verileri henüz veri almaya hazır olmayan yazılabilir akışa aktarma

readable.pipe(transform).pipe(writable); 

ama böyle bir şey yapmak istiyorum: Bir örnek olarak

, normalde yaptığımız bu mümkündür ve doğru, şimdiye kadar sahip nasıl yapılacağı ise

const tstrm = readable.pipe(transform); 

doSomethingAsync().then(function(){ 

     tstrm.pipe(writable); 

}); 

sadece merak her ikisini bulmakta sorun.

Verileri, bir ara yayın akışında, yazılabilir bir akışa bağlanma/aktarmadan önce arabelleğe almayı ve daha sonra, bağlandıktan sonra, herhangi bir yeni veriden önce arabelleğe alınmış verileri akışa almayı düşünüyorum. Yapmak için makul bir şey gibi görünüyor, bu konuda herhangi bir bilgi bulamıyor.

+0

, burada kabul cevap gibi görünüyor ben http://stackoverflow.com/questions/20317759/implementing-a-buffered-transform-stream –

cevap

2

Not, yazarın okuyabildiği veya kullanamayacağını taklit etmek için bir aralık kullanıyorum. yazar Şu son satır

readStrem.pipe(transformBbuffer).pipe(writeStream); 
aşağıdaki gibi okur yani

r.pipe(b).pipe(w); 

istediğini düşünüyorum vb arabelleğe başlamak için devlet güncellerdi yanlış dönerse Sen yani istediğiniz bu herhangi bir şekilde yapabilirsiniz

Örnek kod, tüm verileri arabelleğe almak için yapabileceğimiz bazı değişiklikler var. Koddan sonra tarif edeceğim.

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

Bu kod ... Eğer akışları hakkında bilmeniz gereken ve daha docs olan her şey, onlar daha tam örneklerle yapabileceğini düşünüyorum ama onlar olduğu gibi oldukça iyi.

var fs  = require('fs'); 
var stream = require('stream') 
const util = require('util'); 
//const StringDecoder = require('string_decoder').StringDecoder; 
const Transform = require('stream').Transform; 
var check_buff = 0; 
var DRAIN_ME = 0; 

var r = fs.createReadStream('file1.txt').setEncoding('utf8'); 
var w = fs.createWriteStream('file2.txt'); 

var BufferStream = function() { 
    stream.Transform.apply(this, arguments); 
    this.buffer = []; 
}; 

util.inherits(BufferStream, stream.Transform); 

var intId; 
intId = setInterval(function(){ 
    if(check_buff % 3 == 0) { 
    DRAIN_ME = 1; 
    return; 
    } 
    DRAIN_ME = 0; 
},10); 

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 
    while(DRAIN_ME > 0 && this.buffer.length > 0) { 
    this.push(this.buffer.shift()); 
    } 
    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 

var b = new BufferStream(); 
b.on('end', function(chunk) { 
    clearInterval(intId); 
}); 
r.pipe(b).pipe(w); 

Ben dönüşümü uygulamak için kanonik yol arıyorum/boru üzerinde çağrı gelene kadar tüm veri arabelleği dere yoluyla.

bu oluşturmak test etmek için ... o alan verileri yani durur çünkü şu

BufferStream.prototype._transform = function (chunk, encoding, done) { 
    this.buffer.push(String(chunk)); 

    console.log(chunk.length); 
    console.log(this.buffer.length); 
    done(); 
}; 
...... 
BufferStream.prototype._flush = function (cb) { 
    var len = this.buffer.length; 
    for (var i = 0; i < len; i++) { 
    this.push(this.buffer.shift()); 
    }; 
    cb(); 
}; 

Ayrıca yürürlükte yazılabilir akışı duraklar okunabilir akışı durdurabilir değişikliklerini yapın 100MB ya da daha fazla disk üzerinde oldukça büyük bir dosya ve bunu çalıştırın ...

var fs = require('fs'); 
var readableStream = fs.createReadStream('file1.txt'); 
var writableStream = fs.createWriteStream('file2.txt'); 

readableStream.setEncoding('utf8'); 

readableStream.on('data', function(chunk) { 
    var ready = 0; 
    readableStream.pause(); 
    setInterval(function(){ 
    if(ready == 0) { 
     //console.log('pausing'); 
     readableStream.pause(); 
     ready = 1; 
    } 
    else { 
     //console.log('resuming'); 
     readableStream.resume(); 
     ready = 0; 
    } 
    },100); 
    writableStream.write(chunk); 
}); 

Hemen duraklatmanın nedeni Çünkü aralık 10ms'u attığında dosya zaten yazılmış olabilir. Bunun varyasyonları var yani ...

var fs = require('fs'); 
var readableStream = fs.createReadStream('file1.txt'); 
var writableStream = fs.createWriteStream('file2.txt'); 
readableStream.setEncoding('utf8'); 

var ready = 0; 
setInterval(function(){ 
    if(ready == 0) { 
    //console.log('pausing'); 
    readableStream.pause(); 
    ready = 1; 
    } 
    else { 
    //console.log('resuming'); 
    readableStream.resume(); 
    ready = 0; 
    } 
},100); 

readableStream.on('data', function(chunk) { 
    writableStream.write(chunk); 
    readableStream.pause(); 
}); 
+0

sayesinde ne arıyorum evet okunabilir duraklatabileceğinizi yakındır Bu çok zor değil, ancak veriyi almaya hazır hale gelmesini beklerken verileri tamponlamak için bir geçiş/dönüşüm akışı kullanmaktan ne haber? –

İlgili konular