2017-09-25 5 views
1

J'essaye d'écrire une application de filtrage collaborative vanilla simple, fonctionnant sur Google Cloud Dataproc. Les données se trouvent dans BigQuery. J'ai implémenté ceci selon ce tutoriel: https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlIllegalStateException sur Google Cloud Dataproc

Maintenant le problème est que lors de l'exécution de cet exemple (légèrement modifié) je reçois une IllegalStateException. Plus précisément ici est le stacktrace:

17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Traceback (most recent call last): 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module> 
compute_recommendations() 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations 
conf=conf) 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1298) 
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203) 
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582) 
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040} 
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE]. 

Je pense avoir identifié le problème, mais je ne peux pas trouver la cause du problème. L'extrait de code correspondant est le suivant:

table_rdd = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=conf) 

table_json = table_rdd.map(lambda x: x[1]) 
visit_data = sparkSession.read.json(table_json) 

D'abord, je crée le RDD conformément au didacticiel de Google. L'étape suivante consiste à extraire les éléments JSON à partir du RDD, et ceci est ensuite lu dans une table, que nous pouvons interroger. La stacktrace montre que l'exception se produit lors de l'affectation de conf, mais le code fonctionne jusqu'à ce que j'appelle sparkSession.read.json(table_json) parce que je l'ai compris, spark fonctionne paresseusement et essaie ensuite d'accéder aux fichiers JSON réels qui ont été exportés depuis BigQuery.

Le problème est que Spark trouve plus de fichiers JSON qu'il ne devrait l'être. Selon ce comment dans le code de la bibliothèque BigQuery Hadoop, même si tout tient dans une partition, le minimum est de deux, de sorte que BigQuery reconnaît l'exportation en tant que telle. Il dit aussi qu'il génère un fichier de marqueur de fin, ce qui, pour autant que je puisse le dire, est juste un fichier JSON vide. Mais lors de l'exécution du code, l'exportation générée par BigQuery a plus de 2 fichiers nécessaires (1 contenant des données et 1 comme marqueur de fin). Il génère jusqu'à 5 fichiers JSON, qui contiennent parfois seulement 1 ou 2 lignes de BigQuery.

Je suis assez sûr que c'est le problème, que l'exportation est en quelque sorte fausse. Mais je ne peux pas savoir pourquoi cela arrive et comment le réparer. Toute aide est appréciée.

MISE À JOUR:

J'ai essayé autre chose. J'ai supprimé la table dans BigQuery et l'ai peuplé à partir de zéro. Cela a résolu le problème avec l'exportation. Il n'y a que deux fichiers maintenant. Mais je pense que le problème persiste. Je vais essayer d'ajouter des lignes via les fonctions Cloud (ce qui se passerait dans mon application) et ensuite mettre à jour le comportement.

MISE À JOUR 2:

Alors après avoir attendu un jour et en ajoutant quelques lignes par des inserts en continu à l'aide d'une fonction Cloud, la question se produit de nouveau. D'une certaine manière, les exportations sont partitionnées par jour. Ce ne serait pas un problème si chaque jour obtient son propre fragment, mais cela n'arrive malheureusement pas.

+0

Avez-vous un jobid BigQuery à partager? Vous pouvez également contacter l'équipe Google directement à [email protected] pour partager votre ID de projet. Votre évaluation est correcte: il ne devrait jamais y avoir de fichiers numérotés après le fichier de longueur zéro «marqueur de fin». Utilisez-vous les «inserts de diffusion en continu» de BigQuery pour ajouter des lignes, ou ajoutez-vous des tâches «lourdes»? –

+0

@DennisHuo Merci pour la réponse. J'ai déjà contacté l'équipe google, mais ils prennent beaucoup de temps pour traiter cela. Pour insérer nous utilisons des inserts de streaming, sur les fonctions Cloud. D'une manière ou d'une autre, les fichiers supplémentaires n'apparaissent que si nous attendons un jour et ajoutons des lignes supplémentaires. –

+0

Je ne suis pas sûr si l'exécution de requêtes contre BQ pour construire des systèmes ML est une bonne approche. Ici, dans [ce projet] (https://github.com/WillianFuks/PySpark-RecSys) que j'ai créé, vous pouvez voir que j'ai des exportateurs qui lancent des requêtes en BQ et exportent les résultats vers GCS, puis je les lis en étincelle. Jamais eu de problème et cela fonctionne assez vite (et cela évite aussi de lancer des requêtes tout le temps qui coûte de l'argent, cette approche ne fonctionne qu'une seule fois). Par coïncidence, il implémente également un système de recommandation mais en utilisant l'algorithme DIMSUM. –

Répondre

2

Il s'agit d'un bogue dans BigQuery (qui renvoie les statistiques de nombre de fichiers en sortie qui n'incluent pas le fichier d'enregistrement nul). Le correctif pour ce problème a été soumis et son déploiement se terminera dans environ une semaine. En attendant, une solution de contournement du problème est peut-être de définir l'indicateur "mapred.bq.input.sharded.export.enable" (a.k.a. ENABLE_SHARDED_EXPORT_KEY) sur false dans votre config hadoop lors de la configuration de votre travail DataProc.

MISE À JOUR:
A ce jour le 6 octobre 2017, le correctif est désormais 100% roulé sur BigQuery.

+0

Y a-t-il un ticket ou un rapport de bug quelque part? Je rencontre ce problème lorsque vous essayez d'utiliser le connecteur BigQuery avec PySpark sur DataProc pour lire les données d'une table. – zo7

+0

Nous n'avons pas encore de rapport de bogue public déposé, mais si vous voyez toujours cette erreur, il se peut que ce soit un cas de coin qui produise la même erreur une fois que la table atteint un certain état. Nous avons un bug interne qui le suit, mais n'hésitez pas à envoyer un rapport de bug à BigQuery si vous pensez que c'est utile. En attendant, vous pouvez utiliser la méthode suggérée dans la réponse pour être débloqué. Merci! –