val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(1))
val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
.at(Regions.US_EAST_1)
.withTimeout(5))
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
val sqlSession = SparkSession
.builder()
.getOrCreate()
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val outputPath = "/tmp/spark/presto"
partitions.map(messages => {
val sqsMsg = Json.parse(messages)
System.out.println(sqsMsg)
val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
System.out.println(bucketName)
System.out.println(key)
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try{
val str = txfm.parseLine(line)
val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
}
catch {
case e: Throwable => {log.info(line); "";}
}
}).filter(line => line != "{}")
})
})
streamContext.start()
streamContext.awaitTermination()
Mon travail est vraiment simple, nous prenons une clé S3 de SQS. Le contenu du fichier est nginx log et nous analysons cela en utilisant notre analyseur qui fonctionne fichier. LogLine2Json
Il convertit le journal au format JSON puis nous écrirons cela au format orc
.Spark Aucune opération de sortie enregistrée, donc rien à exécuter mais j'écris dans un fichier
Mais je reçois cette erreur
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparrowOrc$.main(sparrowOrc.scala:159)
at SparrowOrc.main(sparrowOrc.scala)
Je comprends que Spark a besoin d'une action sinon il ne fonctionnera pas. Mais j'ai ce code pour écrire dans un fichier orc. Je ne suis pas sûr si je dois faire autre chose?
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
Il devrait y avoir une chaîne d'appels sur votre trame de données initiale menant à un appel write ou writeStream. Vous écrivez une trame de données, mais comme il s'agit d'un effet secondaire, il n'y a aucune raison de commencer à traiter la trame de données initiale. Les E/S classiques sont difficiles à utiliser car elles fonctionnent en dehors de la chaîne de traitement. Les opérations hadoop de Spark sont compatibles avec S3, je crois: https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –