Il y a plusieurs façons de faire la même chose, étant donné que ci-dessus fonctionne comme l'application mono-thread, vous pouvez même exécuter le même distribué par la simple introduction d'un simple MR comme celui-ci complete note here
public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {
@Override
public void reduce(final Text pKey,
final Iterable<IntWritable> pValues,
final Context pContext)
throws IOException, InterruptedException{
int count = 0;
for(IntWritable val : pValues){
count += val.get();
}
BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}
}
Ou par table de ruche comme indiqué here sans avoir besoin d'écrire beaucoup de code. Ou par étincelle ici
package com.mongodb.spark_examples;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;
import static java.util.Arrays.asList;
import java.util.HashMap;
import java.util.Map;
public final class WriteToMongoDBWriteConfig {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "spark");
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);
// Create a RDD of 10 documents
JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{spark: " + i + "}");
}
});
/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(sparkDocuments, writeConfig);
/*End Example**************************************************/
jsc.close();
}
}
Scala
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection")
.getOrCreate()
import org.apache.spark.sql.SparkSession
val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
val sparkDocuments = spark.sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)
}
Apache Spark est équipé de connecteurs ... mongo Quoi qu'il en soit, en fonction de la source de données, si tout ce que vous faites est le téléchargement d'un fichier texte et le streaming à MongoImport, peut-être que HDFS n'est pas nécessaire –
Également, avez-vous mal lu cette bibliothèque? * permet d'utiliser MongoDB (ou les fichiers de sauvegarde dans son format de données, BSON) comme source d'entrée, ** ou destination de sortie *** –