2017-10-16 10 views
0

J'essaie de déplacer des données de hdfs vers mongodb. Je suis capable de réaliser ceci par la ligne de commande comme ci-dessous.Exportation de données de hdfs vers mongodb

hadoop fs -text "/user/name.txt" | mongoimport --host 127.0.0.1:27018 -d cds -c hello --type tsv --headerline 

Je dois écrire un code scala pour cela. J'ai plusieurs fichiers dans le système de fichiers. J'ai vérifié le connecteur mongo-hadoop mais j'ai besoin du contraire. Lecture de fichiers depuis hdfs et vidage dans mongodb dans scala.

+0

Apache Spark est équipé de connecteurs ... mongo Quoi qu'il en soit, en fonction de la source de données, si tout ce que vous faites est le téléchargement d'un fichier texte et le streaming à MongoImport, peut-être que HDFS n'est pas nécessaire –

+0

Également, avez-vous mal lu cette bibliothèque? * permet d'utiliser MongoDB (ou les fichiers de sauvegarde dans son format de données, BSON) comme source d'entrée, ** ou destination de sortie *** –

Répondre

0

Il y a plusieurs façons de faire la même chose, étant donné que ci-dessus fonctionne comme l'application mono-thread, vous pouvez même exécuter le même distribué par la simple introduction d'un simple MR comme celui-ci complete note here

public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> { 

    @Override 
    public void reduce(final Text pKey, 
         final Iterable<IntWritable> pValues, 
         final Context pContext) 
      throws IOException, InterruptedException{ 

     int count = 0; 
     for(IntWritable val : pValues){ 
      count += val.get(); 
     } 

     BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString())); 
     BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count)); 
     pContext.write(null, new MongoUpdateWritable(query, update, true, false)); 
    } 

} 

Ou par table de ruche comme indiqué here sans avoir besoin d'écrire beaucoup de code. Ou par étincelle ici

package com.mongodb.spark_examples; 

import com.mongodb.spark.MongoSpark; 
import com.mongodb.spark.config.WriteConfig; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.SparkSession; 

import org.bson.Document; 

import static java.util.Arrays.asList; 

import java.util.HashMap; 
import java.util.Map; 


public final class WriteToMongoDBWriteConfig { 

    public static void main(final String[] args) throws InterruptedException { 

    SparkSession spark = SparkSession.builder() 
     .master("local") 
     .appName("MongoSparkConnectorIntro") 
     .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") 
     .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") 
     .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    // Create a custom WriteConfig 
    Map<String, String> writeOverrides = new HashMap<String, String>(); 
    writeOverrides.put("collection", "spark"); 
    writeOverrides.put("writeConcern.w", "majority"); 
    WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides); 

    // Create a RDD of 10 documents 
    JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map 
     (new Function<Integer, Document>() { 
     public Document call(final Integer i) throws Exception { 
       return Document.parse("{spark: " + i + "}"); 
      } 
     }); 

    /*Start Example: Save data from RDD to MongoDB*****************/ 
    MongoSpark.save(sparkDocuments, writeConfig); 
    /*End Example**************************************************/ 

    jsc.close(); 

    } 

} 

Scala

def main(args: Array[String]): Unit = { 
     import org.apache.spark.sql.SparkSession 

    val spark = SparkSession.builder() 
     .master("local") 
     .appName("MongoSparkConnectorIntro") 
     .config("spark.mongodb.input.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection") 
     .config("spark.mongodb.output.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection") 
     .getOrCreate() 
    import org.apache.spark.sql.SparkSession 

    val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext))) 
    val sparkDocuments = spark.sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}"))) 

    MongoSpark.save(sparkDocuments, writeConfig) 

    }