0

J'ai un cas d'utilisation où les informations d'événement sur les capteurs sont insérées en continu dans MySQL. Nous devons envoyer cette information avec un traitement dans un sujet de Kafka toutes les 1 ou 2 minutes. J'utilise Spark pour envoyer cette information au sujet de Kafka et pour maintenir le CDC dans la table de Phoenix. J'emploie un travail de Cron pour exécuter le travail d'allumage toutes les minutes 1.Comment envoyer des messages dans l'ordre d'Apache Spark à Kafka topic

Le problème auquel je suis actuellement confronté est l'ordre des messages, j'ai besoin d'envoyer ces messages en horodatage croissant pour mettre fin au sujet système Kafka (qui a 1 partition). Mais la plupart des commandes de messages sont perdues en raison de plus d'une partition DataFrame spark envoie des informations en même temps au sujet Kafka.

Actuellement, je reformule mon DataFrame en 1, afin de maintenir l'ordre des messages, mais ce n'est pas une solution à long terme car je perds l'informatique distribuée par étincelles.

Si vous avez une meilleure solution, n'hésitez pas à suggérer.

+0

Pourriez-vous montrer comment vous insérez des données dans MySQL? – user8371915

+0

@ user8371915 Les données sont insérées par les applications dont le travail consiste à capturer des événements de capteur et à les insérer dans mysql db, ces applications ne sont pas sous mon contrôle. – nilesh1212

+0

Alors MySQL est la source et Kafka est un évier? Il n'est pas clair pourquoi l'ordre est pertinent, mais en général vous ne pouvez pas avoir de garanties d'ordre et de fin à la fin du parallélisme. – user8371915

Répondre

0

Je suis en mesure d'obtenir l'ordre des messages selon l'horodatage ascendant dans une certaine mesure en réparant mes données avec les clés et en appliquant le tri au sein d'une partition.

val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}")) 
     .toDF("Asset", "Message") 
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset") 
     .select($"Message") 
     .select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"), 
      expr("(split(Message, ','))[1]").cast("String").as("TS"), 
      expr("(split(Message, ','))[2]").cast("Long").as("Col3"), 
      expr("(split(Message, ','))[3]").cast("String").as("Col4"), 
      expr("(split(Message, ','))[4]").cast("String").as("Value")) 
     .sortWithinPartitions($"TS", $"Value")