2017-09-20 1 views
0

Existe-t-il un moyen d'exécuter mon programme spark et d'être protégé des fichiers en cours de modification?Spark code à protéger de FileNotFoundExceptions?

Le code commence par la lecture d'un fichier parquet (pas d'erreurs lors de la lecture):

val mappings = spark.read.parquet(S3_BUCKET_PATH + "/table/mappings/") 

Il fait ensuite des transformations avec l'exemple de données,

val newTable = mappings.join(anotherTable, 'id) 

Ces transformations prennent des heures (ce qui est un autre problème).

Parfois, les finitions d'emploi, d'autres fois, il meurt avec le message similaire suivant:

org.apache.spark.SparkException: Job avorté en raison de l'échec de l'étape: Tâche 6 au stade 1014,0 a échoué 4 fois , échec le plus récent: tâche perdue 6.3 dans l'étape 1014.0 (TID 106820, 10.127.251.252, exécuteur 5): java.io.FileNotFoundException: Aucun fichier ou répertoire: s3a: // bucket1/table/mappings/part-00007 -21eac9c5-yyzz-4295-a6ef-5f3bb13bed64.snappy.parquet

Nous pensons qu'un autre travail consiste à changer les fichiers en dessous de nous, mais n'ont pas été en mesure de trouver le coupable.

+0

essayez d'exécuter avec l'exécution spéculative. – philantrovert

+0

Cela se produit lorsqu'un autre processus tente d'écrire/modifier des fichiers parquet dans ce répertoire. Assurez-vous de lire les fichiers après la fin d'un autre processus. –

+0

Un autre problème à propos du travail à long terme, il est difficile de suggérer quelque chose sans la configuration de votre travail. –

Répondre

1

Ceci est un problème très compliqué à résoudre ici. Si les données sous-jacentes changent pendant que vous travaillez sur la même image, l'étincelle échouera. La raison en est que lorsque la base de données a été créée, le RDD sous-jacent connaissait l'emplacement des données et le DAG qui lui était associé. Maintenant, si les données sous-jacentes ont soudainement changé par un travail, RDD n'a pas d'option mais l'échoue.

Une possibilité d'activation réessayer, spéculation etc, mais néanmoins le problème existe. Généralement, si vous avez une table dans le parquet et que vous voulez lire en même temps, partitionnez la table par date ou heure, puis l'écriture se fera dans la partition différente tandis que la lecture se fera dans une partition différente.

Maintenant, avec le problème de jointure prend beaucoup de temps. Si vous lisez les données à partir de s3, rejoignez et réécrivez à s3, la performance sera plus lente. Parce que maintenant le hadoop doit d'abord récupérer les données de s3 puis effectuer l'opération (le code ne va pas aux données). Bien que l'appel réseau soit rapide, j'ai effectué une expérience avec s3 vs EMR FS et j'ai trouvé 50% de ralentissement avec s3.

Une alternative consiste à copier les données de s3 vers HDFS, puis à exécuter la jointure. Cela vous protégera de l'écrasement des données et les performances seront plus rapides. Une dernière chose si vous utilisez spark 2.2 s3 est douloureusement lent en raison de la dépréciation de DirectOutputCommiter. Cela peut être une autre raison du ralentissement

0

Vous ne pouvez pas utiliser s3a en toute sécurité en tant que sortie de travail directe dans Spark, non sans courir le risque de corrompre les données. Même si l'exécution spéculative est désactivée, le fait que s3 affiche une incohérence indique qu'il risque de trouver les mauvais fichiers à renommer. Et si vous enchaînez votre travail, vous devez ajouter suffisamment de temps pour que les fichiers mis à jour se propagent autour des serveurs de réplication S3, afin d'obtenir une liste cohérente.

Il y a des indices de SPARK-18512 ici; énumérer les incohérences dans S3.

Comme le dit Avishek: travail avec le serveur HDFS local pour une chaîne de requêtes, copie jusqu'à s3a à la fin