2016-07-22 2 views
0

j'ai pu lire un objet JSON à RDD en utilisant le code suivant sans utiliser dataframe, voici mon JSON Objet:lecture objet JSON avec JSONArray l'intérieur de RDD en utilisant Scala et sans utiliser dataframe

{"first":"John","last":"Smith","address":{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"}} 

Voici le code de la lecture à RDD:

package com.spnotes.spark 
import com.fasterxml.jackson.annotation.JsonProperty 
import com.fasterxml.jackson.core.JsonParseException 
import com.fasterxml.jackson.databind.ObjectMapper 
import com.typesafe.scalalogging.Logger 
import org.apache.spark.{SparkContext,SparkConf} 
import org.slf4j.LoggerFactory 
import scala.collection.mutable.ArrayBuffer 
class Person { 
    @JsonProperty var first:String=null 
    @JsonProperty var last:String=null 
    @JsonProperty var address:Address=null 
    override def toString=s"Person(first=$first, last=$last, address=$address)" 
} 

class Address { 
    @JsonProperty var line1:String=null 
    @JsonProperty var line2:String=null 
    @JsonProperty var city:String=null 
    @JsonProperty var state:String=null 
    @JsonProperty var zip:String=null 
    override def toString=s"Address(line1=$line1, line2=$line2, city=$city, state=$state, zip=$zip)" 
} 

object JSONFileReaderWriter{ 
    // val logger =  Logger(LoggerFactory.getLogger("JSONFileReaderWriter")) 
    val mapper = new ObjectMapper() 

    def main(argv: Array[String]): Unit = { 

    if (argv.length != 2) { 
     println("Please provide 2 parameters <inputfile> <outputfile>") 
     System.exit(1) 
    } 
    val inputFile = argv(0) 
    val outputFile = argv(1) 

    println(inputFile) 
    println(outputFile) 

    //logger.debug(s"Read json from $inputFile and write to $outputFile") 

    val sparkConf = new SparkConf().setMaster("local[1]").setAppName("JSONFileReaderWriter") 
    val sparkContext = new SparkContext(sparkConf) 
    val errorRecords = sparkContext.accumulator(0) 
    val records = sparkContext.textFile(inputFile) 

    var results = records.flatMap { record => 
     try { 
      Some(mapper.readValue(record, classOf[Person])) 
     } catch { 
      case e: Exception => { 
      errorRecords += 1 
      None 
     } 
    } 
    }//.filter(person => person.address.city.equals("mumbai")) 

    results.saveAsTextFile(outputFile) 

println("Number of bad records " + errorRecords) 
    } 
} 

Mais quand il y a un JSONArray à l'intérieur du JSONObject, je ne pouvais pas comprendre comment étendre le code. Toute aide est vraiment appréciée.

Voici le JSONObject que je veux lire RDD sans utiliser dataframe:

{"first":"John","last":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"2 main street","city":"Palo Alto","state":"CA","zip":"94305"}]} 

Je ne veux pas utiliser Spark SQL.

Répondre

0

Voici la solution à ce problème. Ce sont les objets mis à jour JSON:

{"first":"John","last":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"1 main street","city":"sunnyvale","state":"CA","zip":"94000"}]} 
{"first":"Tim","last":"Williams","address":[{"line1":"1 main street","city":"Mountain View","state":"CA","zip":"94300"},{"line1":"1 main street","city":"San Jose","state":"CA","zip":"92000"}]} 

Voici le code:

package com.myspark.spark 
import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 
import org.json4s.jackson.Serialization 
import org.json4s.jackson.Serialization.{read, write} 
import org.apache.spark.{SparkContext, SparkConf} 

case class Person (first: String,last: String, address: List[Address]){ 
    override def toString = s"Person(first=$first, last=$last, address=$address)" 
} 
case class Address (line1: String , city: String, state: String, zip: String){ 
    override def toString = s"Address(line1=$line1, city=$city, state=$state, zip=$zip)" 
} 

object JSONFileReaderWriter { 
    def main(argv: Array[String]): Unit = { 

     if (argv.length != 2) { 
      println("Please provide 2 parameters <inputfile> <outputfile>") 
      System.exit(1) 
     } 
     val inputFile = argv(0) 
     val outputFile = argv(1) 
     val sparkConf = new SparkConf().setMaster("local[1]").setAppName("JSONFileReaderWriter") 
     val sparkContext = new SparkContext(sparkConf) 
     val errorRecords = sparkContext.accumulator(0) 
     val records = sparkContext.textFile(inputFile) 

     val dataObjsRDD = records.map { myrecord => 
      implicit val formats = DefaultFormats // Workaround as  DefaultFormats is not serializable 
      val jsonObj = parse(myrecord) 
      val addresses= jsonObj \ "address" 
      println((addresses(0) \ "city").extract[String]) 
      jsonObj.extract[Person] 
     } 

     dataObjsRDD.saveAsTextFile(outputFile) 
    } 
} 

Voici le fichier build.sbt:

name := "JSONFileReaderWriter" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2" 

libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11" 

J'ai utilisé ce qui suit pour exécuter le code sur ma machine locale:

spark-submit --master local[1] target/scala-2.11/jsonfilereaderwriter_2.11-1.0.jar input/inputarray.json output 

Le fichier inputarray.json contient les objets JSON mentionnés en haut de cet article. Après avoir exécuté le code, vous devez vous attendre à voir les sorties suivantes:

San Francisco 
Mountain View 

je json4s pour lire le fichier. Espérons que cela aide le reste qui a le même problème.