2017-07-19 1 views
0
val sc = new SparkContext(conf) 

val streamContext = new StreamingContext(sc, Seconds(1)) 

val log = Logger.getLogger("sqsLog") 
val sqs = streamContext.receiverStream(new SQSReceiver("queue") 
    .at(Regions.US_EAST_1) 
    .withTimeout(5)) 


val jsonRows = sqs.mapPartitions(partitions => { 
    val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY"))) 

    val txfm = new LogLine2Json 
    val log = Logger.getLogger("parseLog") 
    val sqlSession = SparkSession 
    .builder() 
    .getOrCreate() 

    val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/") 
    val parsedDate = parsedFormat.format(new java.util.Date()) 
    val outputPath = "/tmp/spark/presto" 

    partitions.map(messages => { 
    val sqsMsg = Json.parse(messages) 
    System.out.println(sqsMsg) 

    val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") 
    val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "") 
    System.out.println(bucketName) 
    System.out.println(key) 
    val obj = s3Client.getObject(new GetObjectRequest(bucketName, key)) 
    val stream = obj.getObjectContent() 
    scala.io.Source.fromInputStream(stream).getLines().map(line => { 
     try{ 
      val str = txfm.parseLine(line) 
      val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str) 
      jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
     } 
     catch { 
      case e: Throwable => {log.info(line); "";} 
     } 
     }).filter(line => line != "{}") 
    }) 
}) 

streamContext.start() 
streamContext.awaitTermination() 

Mon travail est vraiment simple, nous prenons une clé S3 de SQS. Le contenu du fichier est nginx log et nous analysons cela en utilisant notre analyseur qui fonctionne fichier. LogLine2Json Il convertit le journal au format JSON puis nous écrirons cela au format orc.Spark Aucune opération de sortie enregistrée, donc rien à exécuter mais j'écris dans un fichier

Mais je reçois cette erreur

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at SparrowOrc$.main(sparrowOrc.scala:159) 
    at SparrowOrc.main(sparrowOrc.scala) 

Je comprends que Spark a besoin d'une action sinon il ne fonctionnera pas. Mais j'ai ce code pour écrire dans un fichier orc. Je ne suis pas sûr si je dois faire autre chose?

jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
+0

Il devrait y avoir une chaîne d'appels sur votre trame de données initiale menant à un appel write ou writeStream. Vous écrivez une trame de données, mais comme il s'agit d'un effet secondaire, il n'y a aucune raison de commencer à traiter la trame de données initiale. Les E/S classiques sont difficiles à utiliser car elles fonctionnent en dehors de la chaîne de traitement. Les opérations hadoop de Spark sont compatibles avec S3, je crois: https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –

Répondre

0

Tout d'abord map n'est pas une action. C'est une transformation. Spark n'a aucune raison d'exécuter ce code.

Ensuite, vous devriez éviter les effets secondaires dans les transformations, et vous ne devriez jamais les utiliser, si l'exactitude de la sortie est requise. Enfin, l'utilisation des fonctions standard io dans les systèmes distribués est généralement dénuée de sens.

vous devriez examiner les options existantes Dans l'ensemble des DStream puits, et si aucun entre eux est approprié dans votre scénario, écrivez votre propre à l'aide d'une action (foreach, foreachPartition).

+0

Pouvez-vous expliquer plus pourquoi l'utilisation de la fonction io n'a pas de sens? – toy