2016-12-26 1 views
0

J'ai un flux de données infini issu d'un processus en fourche. Je veux que ce flux soit traité par un module et parfois je veux dupliquer les données de ce flux pour être traité par un autre module (par exemple surveiller un flux de données mais si quelque chose d'intéressant se produit je veux enregistrer les n octets suivants complément d'enquête).Fractionnement de flux NodeJS

Alors supposons que le scénario suivant:

  1. Je commence le programme et commencer à consommer le flux lisible
  2. 2 secondes plus tard, je veux traiter les mêmes données pour 1 sec par un lecteur de flux différent
  3. Une fois que le temps est écoulé, je veux fermer le deuxième consommateur, mais le consommateur d'origine doit rester intacte.

Voici un extrait de code pour cela:

var stream = process.stdout; 

stream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new PassThrough(); 
    stream.pipe(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    stream.unpipe(stream2); 
} 

Mon problème ici est que le unpiping stream2 ne soit pas fermé. Si je l'appelle stream.end() après la commande unpipe, puis il se bloque avec l'erreur:

events.js:160 
     throw er; // Unhandled 'error' event 
    ^

Error: write after end 
    at writeAfterEnd (_stream_writable.js:192:12) 
    at PassThrough.Writable.write (_stream_writable.js:243:5) 
    at Socket.ondata (_stream_readable.js:555:20) 
    at emitOne (events.js:101:20) 
    at Socket.emit (events.js:188:7) 
    at readableAddChunk (_stream_readable.js:176:18) 
    at Socket.Readable.push (_stream_readable.js:134:10) 
    at Pipe.onread (net.js:548:20) 

J'ai même essayé de mettre en pause le flux source pour aider le tampon à être vidées à partir du second courant, mais il ne fonctionne pas non plus:

function stopAnotherConsumer() { 
    stream.pause(); 
    stream2.once('unpipe', function() { 
     stream.resume(); 
     stream2.end(); 
    }); 
    stream.unpipe(stream2); 
} 

Même erreur que précédemment (écriture après fin).

Comment résoudre le problème? Mon intention initiale était de dupliquer les données diffusées à partir d'un point, puis de fermer le deuxième flux après un certain temps.

Note: I tried to use this answer to make it work.

Répondre

0

Comme il n'y avait pas de réponses, je publie ma solution (patchwork). Au cas où quelqu'un en aurait un meilleur, ne le retenez pas.

Un nouveau flux:

const Writable = require('stream').Writable; 
const Transform = require('stream').Transform; 

class DuplicatorStream extends Transform { 
    constructor(options) { 
     super(options); 

     this.otherStream = null; 
    } 

    attachStream(stream) { 
     if (!stream instanceof Writable) { 
      throw new Error('DuplicatorStream argument is not a writeable stream!'); 
     } 

     if (this.otherStream) { 
      throw new Error('A stream is already attached!'); 
     } 

     this.otherStream = stream; 
     this.emit('attach', stream); 
    } 

    detachStream() { 
     if (!this.otherStream) { 
      throw new Error('No stream to detach!'); 
     } 

     let stream = this.otherStream; 
     this.otherStream = null; 
     this.emit('detach', stream); 
    } 

    _transform(chunk, encoding, callback) { 
     if (this.otherStream) { 
      this.otherStream.write(chunk); 
     } 

     callback(null, chunk); 
    } 
} 

module.exports = DuplicatorStream; 

Et l'utilisation:

var stream = process.stdout; 
var stream2; 

duplicatorStream = new DuplicatorStream(); 
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
duplicatorStream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new stream.PassThrough(); 
    duplicatorStream.attachStream(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    duplicatorStream.once('detach', function() { 
     stream2.end(); 
    }); 
    duplicatorStream.detachStream(); 
}