0

Comment puis-je générer le résultat de l'agrégation MongoDB dans la collection sans remplacer la collection d'une autre sortie d'agrégation?Agréger à partir de plusieurs collections

-je besoin d'obtenir des données uniquement avec $ sur: « tempCollection », parce que j'ai 500mln documents et obtenir pipeline stage limit

var q = [ 
    {$match: query}, 
    {$group: {_id: '$hash'}}, 
    {$out: 'tempCollection'} 
]; 

async.parallel([ 
    function(callback) { 
    firstCollection.aggregate(q, callback); 
    }, 
    function(callback) { 
    secondCollection.aggregate(q, callback); 
    }, 

    ... 

], function() { 

    // I want to get all from tempCollection (with pagination) here 

}); 
+1

Mauvaise construction de votre question. '$ out' ** toujours ** remplace. Que veux-tu faire ici vraiment? "Ajouter" les deux résultats dans une seule collection? Ou "fusionner" basé sur certaines valeurs communes "accumulant" d'autres valeurs à partir des deux résultats? Soyez également précis si c'est le pilote de nœud de base ou quelque chose d'autre comme mongoose, ou moine ou autre. –

+0

J'utilise Mongoose. J'ai besoin d'obtenir toutes les valeurs de hachage distinctes de toute façon (fusion ou écriture dans une collection, ou etc). –

+0

Choisissez-en un. "fusionner" - signifie que vous avez une "clé" commune ou des champs qui composent une "clé" et que vous avez l'intention d '"incrémenter" une autre valeur où la même clé est trouvée. "concaténer" - signifie que vous voulez juste que les deux ensembles de résultats se retrouvent dans une seule collection. Notez dans ce dernier que la "clé" doit vraiment être différente, ou artificiellement faite. –

Répondre

1

L'essentiel ici est que l'option $out que jamais « remplace » sortie sur la collection cible. Donc, pour faire quoi que ce soit d'autre, vous devez travailler via une connexion client plutôt que de simplement sortir sur le serveur.

Votre meilleure option ici avec mongoose est d'entrer directement dans le pilote sous-jacent et d'avoir accès au node stream interface tel que supporté par le pilote.

exemple Trival, mais il montre la voie de base à la structure:

var async = require('async'), 
    mongoose = require('mongoose'), 
    Schema = mongoose.Schema; 

mongoose.connect('mongodb://localhost/aggtest'); 

var testSchema = new Schema({},{ "_id": false, strict: false }); 


var ModelA = mongoose.model('ModelA', testSchema), 
    ModelB = mongoose.model('ModelB', testSchema), 
    ModelC = mongoose.model('ModelC', testSchema); 

function processCursor(cursor,target,callback) { 

    cursor.on("end",callback); 
    cursor.on("error",callback); 

    cursor.on("data",function(data) { 
    cursor.pause(); 
    target.update(
     { "_id": data._id }, 
     { "$setOnInsert": { "_id": data._id } }, 
     { "upsert": true }, 
     function(err) { 
     if (err) callback(err); 
     cursor.resume(); 
     } 
    ); 
    }); 
} 

async.series(
    [ 
    // Clean data 
    function(callback) { 
     async.each([ModelA,ModelB,ModelC],function(model,callback) { 
     model.remove({},callback); 
     },callback); 
    }, 

    // Sample data 
    function(callback) { 
     async.each([ModelA,ModelB],function(model,callback) { 
     async.each([1,2,3],function(id,callback) { 
      model.create({ "_id": id },callback); 
     },callback); 
     },callback); 
    }, 

    // Run merge 
    function(callback) { 
     async.parallel(
     [ 
      function(callback) { 
      var cursor = ModelA.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      }, 
      function(callback) { 

      var cursor = ModelB.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      } 
     ], 
     callback 
    ); 
    }, 

    // Get merged 
    function(callback) { 
     ModelC.find({},function(err,results) { 
     console.log(results); 
     callback(err); 
     }); 
    } 
    ], 
    function(err) { 
    if (err) throw err; 
    mongoose.disconnect(); 
    } 
); 

Oustide de cela, alors vous allez avoir besoin de $out aux collections « séparées », puis de les fusionner avec une .update() similaire processus, mais pour le garder "côté serveur" alors vous devez utiliser .eval().

Ce n'est pas bien, mais c'est la seule façon de garder les opérations sur le serveur. Vous pouvez également modifier cela avec "Bulk" opérations (à nouveau via la même interface natif .collection) pour un débit un peu plus. Mais les options se résument à "lire à travers le client" ou "évaluer".