2016-07-01 4 views
2

Nous avons Spark très simple travail en streaming (en Java), qui est:Kafka + évolutivité Spark

  • lecture JSONs de Kafka via DirectStream (acquittements sur les messages Kafka sont désactivés)
  • analyse syntaxique des JSONs en POJO (en utilisant GSON - nos messages ne sont que ~ 300 octets)
  • Plan du POJO à tuple de valeur clé (valeur = objet)
  • reduceByKey (Réduir fonction - toujours comparer 1 champ - qualité - des objets et laisse l'instance d'objet avec une qualité supérieure)
  • stocker le résultat dans l'état (via mapWithState stocke l'objet avec la plus haute qualité par clé)
  • stocker le résultat à HDFS

Les JSONs sont générés avec l'ensemble de 1000 ID (clés) et tous les événements sont distribués aléatoirement aux partitions de sujets Kafka. Cela signifie également que l'ensemble résultant d'objets est max 1000, car le travail stocke uniquement l'objet avec la qualité la plus élevée pour chaque ID.

Nous avons l'exécution des tests de performance sur AWS DME (m4.xlarge = 4 noyaux, 16 Go de mémoire) avec les paramètres suivants:

  • nombre d'exécuteurs = nombre de noeuds (par exemple 1 par noeud exécuteur)
  • nombre de partitions Kafka = nombre de noeuds (par exemple dans notre cas aussi exécuteurs)
  • taille du lot = 10 (s)
  • fenêtre glissante = 20 (s)
  • taille de la fenêtre = 600 (s)
  • taille de bloc = 2000 (ms)
  • défaut Parallélisme - a essayé différents paramètres, cependant les meilleurs résultats obtenir lorsque le parallélisme par défaut est = nombre de noeuds/exécuteurs

groupe Kafka ne contient que 1 courtier, qui est utilisé au maximum ~ 30-40% pendant la charge de pointe (nous pré-remplissons les données sur le sujet et exécutons ensuite le test de manière indépendante). Nous avons essayé d'augmenter num.io.threads et num.network.threads, mais sans amélioration significative.

L'qu'il résulte de tests de performance (environ 10 minutes de charge continue) étaient (maître de fils et les noeuds du pilote sont au-dessus des chiffres de noeud bas):

  • deux noeuds - capables de traiter max.150 000 événements/s sans aucun retard de traitement
  • 5 noeuds - 280 000 événements/s =>25% pénalité si on le compare à attendre "évolutivité presque linéaire"
  • 10 noeuds - 380 000 événements/s =>pénalité de 50% si on les compare à attendre « presque une évolutivité linéaire »

L'utilisation du processeur en cas de 2 noeuds a été ~

Nous avons également joué dans d'autres contextes, notamment: - test faible nombre/élevé de partitions - test faible/élevé/valeur par défaut de defaultParallelism - test avec un plus grand nombre d'exécuteurs (c.-à-d. diviser les ressources par ex. 30 exécuteurs au lieu de 10) mais les paramètres ci-dessus nous donnaient les meilleurs résultats.

Donc - la question - est Kafka + Spark (presque) linéaire évolutive? Si cela devrait être beaucoup plus évolutif, que nos tests montrés - comment cela peut être amélioré. Notre objectif est de soutenir des centaines/milliers d'exécuteurs Spark (c'est-à-dire que l'évolutivité est cruciale pour nous).

+0

Votre cas d'utilisation fait un shuffle de données complet dans le reduceByKey, qui deviendra de plus en plus cher à mesure que vous mettez votre cluster à l'échelle. Au moins, la performance globale est limitée par les performances de la performance de l'exécuteur le moins performant, qui ne peut que s'aggraver lorsque vous ajoutez des exécuteurs. Pouvez-vous essayer en utilisant un partitionneur Kafka pour avoir tous les messages pour un ID donné dans une seule partition? Cela devrait permettre une échelle presque linéaire je pense. – C4stor

+1

Combien de serveurs kafka avez-vous dans le cluster? Avez-vous une configuration de réplication de partition entre eux? –

+0

Il y a beaucoup de différentes parties à prendre en compte.Combien de partitions a votre cluster Kafka? Quelle est la taille de votre intervalle de point de contrôle? –

Répondre

3

Nous avons résolu ce par:

  • augmenter la capacité du groupe Kafka
    • plus de puissance CPU - augmenter le nombre de noeuds pour Kafka (1 nœud Kafka par 2 noeuds de exectur d'allumage semblaient être fines)
    • plus courtiers - en gros 1 courtier par exécuteur testamentaire nous a donné les meilleurs résultats
  • réglage par défaut parallèle approprié ism (nombre de cœurs dans le cluster * 2)
  • en vérifiant que tous les nœuds auront env. la même quantité de travail taille des lots
    • /blockSize doit être ~ égale ou un multiple de nombre d'exécuteurs

À la fin, nous avons été en mesure d'atteindre 1 100 000 événements/s traité par le cluster spark avec 10 noeuds exécuteurs. Le réglage effectué a également augmenté les performances sur les configurations avec moins de nœuds -> nous avons atteint une évolutivité pratiquement linéaire lors de la mise à l'échelle de 2 à 10 nœuds exécuteurs spark (m4.xlarge sur AWS). Au début, la CPU sur le nœud Kafka ne se rapprochait pas des limites, mais elle n'était pas capable de répondre aux demandes des exécutants Spark. Thnx pour toutes les suggestions, en particulier pour @ArturBiesiadowski, qui a suggéré que le cluster Kafka était incorrect.