2014-07-04 4 views
2

Je travaille actuellement sur un projet Nodejs qui utilise MongoDB et surveille de près le flux de données du serveur Nodejs.Nodejs Async upsert question

Le code de mon serveur de nœud se passe comme suit:

Receive.js
1. Serveur nœud reçoit un fichier texte JSON
2. Serveur de nœud indique MongoDB (upsert), nous avons reçu un fichier

Process.js
3. serveur de nœud upserts JSON à MongoDB
4. serveur Node indique MongoDB nous avons traité ledit fichier.

Le problème est que parfois # 4 se produit avant # 2 malgré # 1 se produit toujours avant # 3. Mon programme de surveillance commence à afficher qu'il y a plus de fichiers traités que reçus. Y at-il un moyen de résoudre ce problème sans rendre ce serveur complètement synchrone?

Par exemple, si je vous envoie mon serveur Node 500 fichiers texte JSON, le nœud de temps dernier exécute # 2 devrait être avant la dernière fois # 4 est exécuté.

** Note: App.js appels .receive de Receive.js et Receive.js appels .Save de Process.js **

app.js

http.createServer(app).listen(app.get('port'), function(){ 
    mongodb.createConnection(function(db){ 
    if(db){ 
     console.log('success connecting to mongodb!'); 
    } 
    }); 
}); 
app.post('/log', logCollection.receive); 

-
Receive.js

exports.receive = function(req, res, next){ 
    var usagelog = req.body.log; 

    if(!usagelog){ 
    log4js.logger.error("Usagelog is Null!"); 
    res.end("fail"); 
    }else{  
    count++; 

    //Upsert the Receive Count for Monitor 
     mongodb.getConnection(function(db){ 

      dateFormat.initDate(); 
      //Upsert trip (usage logs) 
      var currentDateTime = moment().format('YYYY-MM-DD hh:mm:ss'); 
      db.collection("ReceiveCount").update({"date":dateFormat.currentDate(), "pid":process.pid }, 
        {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countReceive" : count, "LastReceivedTime": currentDateTime}, 
        {upsert:true}, function(err, result) {    
      }); 
      }); 

    //End Receive count 
    var usagelogJSON = convertToJson(usagelog); 
    var usagelogWithCurrentDate = addUpdateTime(usagelogJSON); 
    usagelogDao.save(usagelogWithCurrentDate, res); 
    } 
}; 

-

Process.js

exports.save = function(usagelog, res){ 
    var selector = upsertCondition(usagelog); 

    mongodb.getConnection(function(db){ 
    //Upsert trip (usage logs) 
    db.collection(collectionName()).update(selector, usagelog, {upsert:true, fullResult:true}, function(err, result) { 
     if(err){ 
     log4js.logger.error(err); 
     res.end("fail"); 
     }else{ 

     if(result.nModified == 0) 
      countInsert++; 
     else 
      countUpdate++; 

     //Upsert Processed Count (both Updated and Inserted  
     db.collection("ReceiveCount").find({"date":dateFormat.currentDate(), "pid":process.pid },{}).toArray(function (err, docs) { 

      var receivecount = docs[0].countReceive; 
      var receivetime = docs[0].LastReceivedTime; 
      var currentDateTime = moment().format('YYYY-MM-DD hh:mm:ss'); 
      var MongoCount=0; 

      db.collection("raw_"+dateFormat.currentDate()).count(function(err, count){ 
       console.log("raw_"+dateFormat.currentDate()); 

       MongoCount = count; 
       console.log("Mongo count is :" +MongoCount); 
      }); 

      db.collection("ReceiveCount").update({"date":dateFormat.currentDate(), "pid":process.pid }, 
       {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countUpdate" : countUpdate, "countInsert":countInsert, "countTotal":countUpdate+countInsert, "LastProcessTime": currentDateTime, 
       "countReceive":receivecount, "LastReceivedTime":receivetime, "MongoCount":MongoCount}, {upsert:true}, function(err, result) {  
      }); 
     }); 
     res.end("success"); 
     } 
    }); 
    }); 
}; 
+1

En dépit des upvotes ici il n'y a pas une question très claire en termes de SO. Il existe des moyens de gérer cela avec le traitement asynchrone et les bibliothèques qui aident aussi. Mais nous avons besoin de voir du code pour que cette question puisse être résolue. Montrez votre code afin que nous puissions voir où vous allez mal. Upvotes! = Réponses. –

+0

Mieux pour la mise à jour, mais je pense que le contexte de la façon dont vous appelez vos exportations de «recevoir» et «enregistrer» est la partie la plus importante de votre problème ici. –

Répondre

0

Vous pouvez utiliser le module async (https://github.com/caolan/async). Voici le code pseudo:

async.parallel([ 
    fun1(cb) { 
    tellDBYouReceivedFile(..., onFinished() { 
     cb(); 
    }); 
    }, 
    fun2(cb) { 
    saveYourFileToDB(..., onFinished() { 
     cb(); 
    }); 
    }, 
], function() { 
    tellDBYouProcessedFile(); 
}); 

Ce que vous essayez d'atteindre ne peut pas être fait de manière totalement asynchrone parce que la dernière étape (# 4) doit attendre # 2 et # 3 à terminer premier. L'exemple ci-dessus peut rendre asynchrones # 2 et # 3, et la garantie # 4 est exécutée en dernier.