2017-02-20 3 views
0

Je connais les flux de nœuds, mais je me bats sur les meilleures pratiques pour l'abstraction de code que je réutilise beaucoup dans une étape de tuyau unique.Nœud - Résumé des étapes de tuyau dans la fonction

Voici une version allégée de ce que je vous écris aujourd'hui:

inputStream 
.pipe(csv.parse({columns:true}) 
.pipe(csv.transform(function(row) {return transform(row); })) 
.pipe(csv.stringify({header: true}) 
.pipe(outputStream); 

Le travail réel se produit dans transform(). Les seules choses qui changent vraiment sont inputStream, transform() et outputStream. Comme je l'ai dit, c'est une version dépouillée de ce que j'utilise réellement. J'ai beaucoup d'erreur de gestion et d'enregistrement sur chaque étape de tuyau, ce qui est finalement la raison pour laquelle j'essaie de faire abstraction du code.

Ce que je suis à la recherche d'écrire est une seule étape de tuyau, comme ceci:

inputStream 
.pipe(csvFunction(transform(row))) 
.pipe(outputStream); 

Ce que j'ai du mal à comprendre comment transformer ces étapes de tuyau en une seule fonction qui accepte un flux et retourne un flux J'ai regardé des bibliothèques comme Through2 mais je ne suis pas sûr de savoir comment cela me mène là où j'essaye d'aller.

Répondre

0

Voici ce que je fini par aller avec. J'ai utilisé la bibliothèque through2 et le streaming API of the csv library pour créer la fonction de tuyau que je cherchais.

var csv = require('csv'); 
    through = require('through2'); 

module.exports = function(transformFunc) { 
    parser = csv.parse({columns:true, relax_column_count:true}), 
    transformer = csv.transform(function(row) { 
     return transformFunc(row); 
    }), 
    stringifier = csv.stringify({header: true}); 

    return through(function(chunk,enc,cb){ 
     var stream = this; 

      parser.on('data', function(data){ 
       transformer.write(data); 
      }); 

      transformer.on('data', function(data){ 
       stringifier.write(data); 
      }); 

      stringifier.on('data', function(data){ 
       stream.push(data); 
      }); 

      parser.write(chunk); 

      parser.removeAllListeners('data'); 
      transformer.removeAllListeners('data'); 
      stringifier.removeAllListeners('data'); 
      cb(); 
    }) 
} 

Il est intéressant de noter la partie où je retire les écouteurs d'événements vers la fin, cela était dû à des erreurs en cours d'exécution dans la mémoire où j'avais créé trop d'auditeurs d'événements. J'ai d'abord essayé de résoudre ce problème en écoutant des événements avec once, mais cela a empêché les morceaux suivants d'être lus et transmis à l'étape de pipe suivante. Faites-moi savoir si quelqu'un a des commentaires ou des idées supplémentaires.

2

Vous pouvez utiliser la classe PassThrough comme ceci:

var PassThrough = require('stream').PassThrough; 

var csvStream = new PassThrough(); 
csvStream.on('pipe', function (source) { 
    // undo piping of source 
    source.unpipe(this); 
    // build own pipe-line and store internally 
    this.combinedStream = 
    source.pipe(csv.parse({columns: true})) 
     .pipe(csv.transform(function (row) { 
     return transform(row); 
     })) 
     .pipe(csv.stringify({header: true})); 
}); 

csvStream.pipe = function (dest, options) { 
    // pipe internal combined stream to dest 
    return this.combinedStream.pipe(dest, options); 
}; 

inputStream 
    .pipe(csvStream) 
    .pipe(outputStream); 
+0

Comment passer la référence de transformation (ligne) dans le flux? Ne serais-je pas mieux d'utiliser un flux de transformation? J'ai lu que PassThrough est plus pour surveiller un flux que d'y travailler? – AdamPat

+0

Dépend de la façon dont vous empaquetez la gestion de votre flux. Placez simplement la fonction dans un endroit visible ou placez-la comme référence. Et bien sûr vous pouvez implémenter votre solution dans un flux de transformation (regardez ici https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream). Montrer l'idée avec un flux PassThrough était juste le moyen le plus simple que j'ai trouvé. – Marc