J'ai un flux de données infini issu d'un processus en fourche. Je veux que ce flux soit traité par un module et parfois je veux dupliquer les données de ce flux pour être traité par un autre module (par exemple surveiller un flux de données mais si quelque chose d'intéressant se produit je veux enregistrer les n octets suivants complément d'enquête).Fractionnement de flux NodeJS
Alors supposons que le scénario suivant:
- Je commence le programme et commencer à consommer le flux lisible
- 2 secondes plus tard, je veux traiter les mêmes données pour 1 sec par un lecteur de flux différent
- Une fois que le temps est écoulé, je veux fermer le deuxième consommateur, mais le consommateur d'origine doit rester intacte.
Voici un extrait de code pour cela:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
Mon problème ici est que le unpiping stream2 ne soit pas fermé. Si je l'appelle stream.end()
après la commande unpipe
, puis il se bloque avec l'erreur:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
J'ai même essayé de mettre en pause le flux source pour aider le tampon à être vidées à partir du second courant, mais il ne fonctionne pas non plus:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function() {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
Même erreur que précédemment (écriture après fin).
Comment résoudre le problème? Mon intention initiale était de dupliquer les données diffusées à partir d'un point, puis de fermer le deuxième flux après un certain temps.
Note: I tried to use this answer to make it work.