1
EventStore

Le problème

Je suis un Microservice comme la mise en œuvre d'un agrégat d'approvisionnement événement qui, à son tour, est mis en œuvre en tant que Flink FlatMapFunction. Dans la configuration de base, l'agrégat lit les événements et les commandes de deux sujets kafka. Ensuite, il écrit de nouveaux événements sur ce premier sujet et traite les résultats dans un troisième sujet. Par conséquent, Kafka agit en tant que magasin d'événements. Espoir ce dessin aide:Récupération cohérence de l'État dans Flink lors de l'utilisation Kafka

RPC Request        RPC Result 
    |             | 
    ~~~~> Commands-|    |---> Results ~~~~~~| 
       |-->Aggregate--| 
    ~> Input evs. -|    |---> output evs. ~~~ 
    |             | 
    ~~~~~<~~~~~~~~~~~<~~~feedbak loop~~~~~<~~~~~~~~<~~~ 

En raison du fait que Kafka ne checkpoined, les commandes pourraient être rejoué deux fois et il semble que les événements de sortie pourrait être reformulée deux fois le sujet.

Comment l'état peut-il être récupéré dans ces cas avec des messages répétés? Est-il possible pour l'agrégat de savoir quand ses flux d'entrée sont à jour pour commencer à traiter les commandes?

Mes pensées

J'ai pensé plusieurs solutions:

  1. Si Flink implémente un rollback des événements non confirmées, un évier pourrait être mis en œuvre qui se le courant décalé par rapport à la source de l'événement. Au redémarrage, ce collecteur supprimerait les événements plus récents que les décalages dans la rubrique kafka. À sa manière, KafkaSource et KafkaSink seraient générés à partir du même constructeur et exposés ensuite à la topologie. Cette solution pose un problème important étant donné que d'autres services peuvent lire les événements les plus récents dans le sujet et provoquer des incohérences.

  2. Si la suppression des événements de Flink n'est pas possible dans 2, la source statefull peut potentiellement lire les événements du décalage et essayer de faire correspondre les événements répétés dans l'agrégat et les supprimer. Ces options ne semblent pas robustes car il peut y avoir des situations dans lesquelles les correctifs ne sont pas déterministes et sujets à des failles car ils devraient être repensés pour chaque agrégat et topologie et ne garantiraient pas la récupération (par exemple en cas de redémarrages consécutifs). C'est donc une mauvaise solution.

  3. Une approche différente est celle-ci. Il s'agit de créer un KafkaSource spécial avec deux filigranes spéciaux: le premier, KafkaSourceStartedWatermark, sera toujours envoyé au démarrage de la source pour avertir les opérateurs dépendants. Lorsque ce filigrane est envoyé, la source enregistre en interne le décalage Kafka actuel. La seconde, KafkaSourceUpToDateWatermark, est envoyée par la source lorsque le décalage est atteint. Ces filigranes voyageraient le long de la topologie de manière transparente. L'opérateur doit être capable de gérer ces filigranes, en implémentant une interface spéciale WatermarkNotifiable. Ensuite, l'agrégat pourra mettre en mémoire tampon ou supprimer des commandes RPC jusqu'à ce qu'il soit à jour dans chaque source d'entrée.

    interface WatermarkNotifiable { 
        void started(String watermarkId);//KafkaSourceStartedWatermark watermark 
        void upToDate(String watermarkId);//KafkaSOurceUpToDateWatermark watermark 
    } 
    
  4. Si la mise en œuvre de l'infrastructure dans 3 est impossible, le KafkaSource pourrait mettre en œuvre un constructeur spécifiant un événement de filigrane spécial qui pourrait se rendre aux opérateurs, mais cela nécessiterait que tous les opérateurs dépendent de ces filigranes un re -émets alors.

  5. Une autre approche différente consiste à ne pas traiter les commandes antérieures à un critère. Par exemple, les commandes ont un horodatage d'entrée. Si le temps est utilisé, la synchronisation de l'heure est critique.

remet en question StackOverflow connexes

  1. Using Kafka as a (CQRS) Eventstore. Good idea?
  2. Kafka - Know if Consumer is up to date
  3. Kafka & Flink duplicate messages on restart

Répondre

0

Créer un nouveau type d'opérateur Conmuter. C'est comme une source. Il est composé de plusieurs sources représentant des sujets d'événement et de commande. Il commence à "récupérer" l'état. Dans cet état, il lit des sujets d'événements jusqu'à leur dernier. Pendant ce temps, pour les commandes, il les stocke ou les laisse tomber. Une fois à jour, il considère récupéré et "ouvre" le chemin aux commandes. Il pourrait être mis en œuvre séparément en tant que source plus un opérateur.

FlinkKafkaProducerXX n'est pas suffisant pour faire cela, mais ce serait la base pour l'implémenter.