2016-09-07 1 views
1

J'ai besoin de lire les fichiers csv qui seront pompés de l'équipement de télémétrie dans un emplacement sur le nuage et de stocker les données pertinentes dans le magasin Mongodb. J'utilise Spark Streaming pour lire de nouveaux fichiers (ils arrivent toutes les minutes, parfois même plus fréquemment) et en utilisant le connecteur MomgoDB-Spark. Le problème est que les données ne sont pas chargées dans MomgoDB. J'ai ajouté les étapes show() de Dataframe dans mon code et elles sont correctement affichées sur la console, ce qui signifie que l'application Streaming lit et traite les fichiers comme prévu. Mais la dernière étape de l'enregistrement de MongoDB ne se passe pas. Mon code se présente comme suitSpark Streaming avec backend MongoDB

reqdata.foreachRDD { edata => 
    import sqlContext.implicits._ 
    val loaddata = edata.map(w => EnergyData(w(0).toString,w(1).toString,w(2).toString)).toDF() 
    loaddata.show() 
    loaddata.printSchema(); 
    MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("overwrite")) 
} 

ssc.start() 

La fonction loaddata.show() affiche les données très bien.

J'ai vérifié les journaux MongoDB et trouvé quelques lignes étranges comme

« 2016-09-07T08: 12: 30,109-0700 connexion I RESEAU [initandlisten] a accepté de 127.0.0.1:55694 # 212 (3 connexions maintenant ouvertes) 2016-09-07T08: 12: 30,111-0700 Je commande [conn212] CMD: baisse storedata.energydata »

maintenant, je ne comprends pas pourquoi Mongo baisserait la collection du tout . Toute aide serait très appréciée

+3

_Je ne comprends pas pourquoi Mongo baisserait la collection à ALL_ - '.mode (« écraser ») ' – zero323

Répondre

0

je l'ai résolu moi-même en changeant le mode d'économie d'append:

MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("append"))