2017-01-24 1 views
2

Je joue avec les flux et la fonctionnalité async/wait. Ce que j'ai jusqu'ici est:attendre pour la fonction avec rappel

let logRecord = ((record, callback) => { 
    console.log(record); 
    return callback(); 
}); 

let importCSVfromPath = async((csv_path) => { 
    return new Promise(function(resolve, reject) { 
     var parser = parse(); 
     var input = fs.createReadStream(csv_path); 
     var transformer = transform(logRecord, {parallel: 1}); 

     input.on('error', (err) => { 
      reject(err); 
     }); 
     input.on('finish',()=> { 
      resolve(); 
     }); 

     input.pipe(parser).pipe(transformer); 
    }); 
}); 

Maintenant, je veux remplacer logRecord avec importRecord. Le problème est que cette fonction doit utiliser des fonctions qui font déjà partie de la pile asynchrone.

let importRecord = async((record) => { 
    ....... 
    await(insertRow(row)); 
}); 

Quelle est la bonne façon de procéder?

+0

Que fournit les fonctions 'async' et' await' ici? – lonesomeday

Répondre

1

C'est un peu plus compliqué que cela - les flux node.js ne sont pas adaptés (du moins pas encore) aux méthodes es7 async/await.

Si vous souhaitez développer ceci par vous-même, pensez à écrire une classe dérivée de Readable stream. L'implémentation d'une interface basée sur les promesses est une tâche assez complexe, mais c'est possible.

Si vous êtes satisfait de l'utilisation d'un framework sous licence permissif, jetez un oeil à Scramjet. Avec elle, votre code ressemblera à ceci (la plupart de l'exemple est l'analyse du CSV - Je vais ajouter un assistant dans la prochaine version):

fs.createReadStream("file.csv")  // open your file 
    .pipe(new StringStream())   // pass to scramjet 
    .split("\n")      // split by line 
    .parse((line) => line.split(",")) // convert lines to arrays 
    .map(async (line) => {    // run asynchrounous mapping 
     await importRecord(line);  // import log to DB 
     return logRecord(line);  // return some log for the output 
    }) 
    .pipe(process.stdout);    // pipe the output wherever you like 

Je crois qu'il est exactement ce que vous cherchez et il sera exécutez vos importations d'enregistrements en parallèle, tout en conservant l'ordre de sortie.