5

J'ai créé le pipeline suivant: Gestionnaire de tâches -> SQS -> Travailleur scraper (mon application) -> AWS Firehose -> Fichiers S3 -> Spark -> (?) Redshift.Comment traiter les fichiers S3 incrémentiels dans Spark

Certaines choses que je suis en train de résoudre/améliorer et je serais heureux d'orientation:

  1. Le grattoir pourrait obtenir des données dupliquées, et les rincer à nouveau à firehose, ce qui se traduira par dups à étincelle. Dois-je résoudre cela dans l'étincelle en utilisant la fonction Distinct AVANT de commencer mes calculs?
  2. Je ne supprime pas les fichiers traités S3, donc les données deviennent de plus en plus volumineuses. Est-ce une bonne pratique? (Ayant s3 comme base de données d'entrée) Ou dois-je traiter chaque fichier et le supprimer une fois l'étincelle terminée? Actuellement je fais sc.textFile("s3n://...../*/*/*") - qui collectera TOUS mes fichiers de seau et exécutera des calculs dessus.
  3. Pour placer les résultats dans Redshift (ou s3) -> comment puis-je le faire de façon incrémentielle? c'est-à-dire que si le s3 devient de plus en plus gros, le redshift aura des données dupliquées ... Dois-je toujours le rincer avant? Comment?
+0

vous pouvez avoir votre seau pour les éléments à traiter, et une fois qu'ils ont été poussés, vous les déplacez dans un autre seau afin de conserver une copie si nécessaire, mais vous ne les retraiterez pas une seconde fois –

Répondre

0

J'ai rencontré ces problèmes avant même si pas dans un seul pipeline. Voici ce que j'ai fait.

  1. duplications Suppression

    a. J'ai utilisé BloomFilter pour supprimer les duplications locales. Notez que le doc est relativement incomplet, mais vous pouvez enregistrer/charger/union/croiser facilement les objets de filtre bloom. Vous pouvez même faire reduce sur les filtres.

    b. Si vous enregistrez les données directement de Spark à RedShift, il est probable que vous ayez besoin de consacrer du temps et des efforts pour mettre à jour BloomFilter pour le lot en cours, le diffuser, puis filtrer pour vous assurer qu'il n'y a pas de duplications globales. Avant j'ai utilisé une contrainte UNIQUE dans RDS et ignorer l'erreur, mais malheureusement RedShift does not honour the constraint.

  2. et 3. Les données deviennent de plus

J'utilise un cluster DME pour exécuter s3-dist-cp command pour déplacer & des données de fusion (car il y a généralement beaucoup de petits fichiers journaux, dont la performance de l'impact Spark). S'il vous arrive d'utiliser EMR pour héberger votre cluster Spark, ajoutez simplement une étape avant votre analyse pour déplacer les données d'un compartiment à l'autre. L'étape prend la command-runner.jar comme le pot sur mesure et la commande ressemble

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess 

Notez que le distcp d'origine ne supporte pas les fichiers fusion.

Généralement, vous devriez éviter d'avoir des données traitées et non traitées ensemble dans le même compartiment (ou au moins, chemin).