Je suis en train d'exécuter un travail de diffusion structurée sur un cluster Spark 2.2 qui s'exécute sur AWS. J'utilise un seau S3 dans eu-central-1 pour le point de contrôle. Certains se actions sur les travailleurs semblent échouer au hasard avec l'erreur suivante:Spark Structured Streaming avec S3 ne réussit pas à démarrer
17/10/04 13:20:34 WARN TaskSetManager: Lost task 62.0 in stage 19.0 (TID 1946, 0.0.0.0, executor 0): java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0,part=62),dir=s3a://bucket/job/query/state/0/62]
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:198)
at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:230)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:99)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: XXXXXXXXXXX, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method., S3 Extended Request ID: abcdef==
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Le travail est subbmitted avec les options suivantes pour permettre des seaux UE-1 central:
--packages org.apache.hadoop:hadoop-aws:2.7.4
--conf spark.hadoop.fs.s3a.endpoint=s3.eu-central-1.amazonaws.com
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true
--conf spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true
--conf spark.hadoop.fs.s3a.access.key=xxxxx
--conf spark.hadoop.fs.s3a.secret.key=xxxxx
Je l'ai déjà essayé de générer une clé d'accès sans caractères spéciaux et en utilisant des politiques d'instance, les deux ont eu le même effet.
N'utilisez pas S3 pour le point de contrôle. Comme S3 ne fournit qu'une * consistance finale * en lecture-écriture-écriture, il n'y a aucune garantie que lorsque HDFSBackedStateStore' répertorie les fichiers, ou tente de renommer un fichier, il existera dans le compartiment S3 même s'il vient d'être écrit . –
Que puis-je utiliser d'autre? Lors de l'utilisation de HDFS, le journal des modifications devient si grand qu'il ne démarre pas –
Utilisez HDFS. De quel journal de changement parlons-nous? –