2017-05-20 1 views
0

J'ai utilisé le MongoDB Connector for Spark pour charger des DataFrames à partir de collections MongoDB. Je voudrais déplacer plus de mon processus ETL dans Spark et vouloir obtenir des fichiers de 1-2 Go dans Spark à partir d'un service Java qui effectue l'ingestion et l'analyse de base des fichiers. Comme j'ai déjà un cluster MongoDB, il serait facile de déposer des données au format JSON dans GridFS, et je préfèrerais ne pas configurer un système de fichiers en cluster ou HDFS juste pour cela.Chargement d'un DataFrame Spark 2.x à partir de MongoDB GridFS

Le connecteur Mongo Spark ne sait rien de GridFS. Le MongoDB Connector for Hadoop a un GridFSInputFormat, documenté dans un JIRA comment.

Je vois la vieille classe SparkContext a une méthode newAPIHadoopFile() qui prend un InputFormat pour construire un RDD, mais je pensais que SparkSession était la nouvelle hotness.

Est-il possible que Spark charge un DataFrame à partir d'un Hadoop InputFormat comme le GridFSInputFormat? Je veux lire un fichier JSON-lines à partir de GridFS, déduire le schéma, et finir avec un DataSet[Row]. Et y a-t-il quelque chose de complètement fou dans cette approche?

Répondre

0

Pas grave à la fin. J'ai ajouté le connecteur Hadoop Mongo:

libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "2.0.2" 

Et utilisé pour obtenir une RDD[(NullWritable, Text)], qui se transforme facilement en RDD[String] avec un appel à map, puis à DataFrame avec sparkSession.read.json:

/** Loads a DataFrame from a MongoDB GridFS file in JSON-lines format */ 
def loadJsonLinesFromGridFSFile(gridFsId: String): DataFrame = { 
    jsonLinesToDataFrame(loadRDDFromGridFSFile(gridFsId)) 
} 

/** Uses the Mongo Hadoop plugin to load an RDD of lines from a GridFS file */ 
private def loadRDDFromGridFSFile(gridFsId: String): RDD[String] = { 
    val conf = new Configuration() 
    val uri = config.uri.getCredentials 
    conf.set("mongo.input.uri", "mongodb://127.0.0.1/somedb.fs") 
    conf.set("mongo.input.format", classOf[GridFSInputFormat].getName) 
    conf.set("mongo.input.query", s"{ _id: { $$oid: '$gridFsId' } }") 
    sparkSession.sparkContext.newAPIHadoopRDD(
    conf, classOf[GridFSInputFormat], classOf[NullWritable], classOf[BinaryComparable]).map(_._2.toString) 
} 

private def jsonLinesToDataFrame(rdd: RDD[String]): DataFrame = { 
    sparkSession.read.json(rdd) 
}