0

Excuses si le titre est trop vague, mais j'ai eu du mal à le formuler correctement. En gros, j'essaie de savoir si Apache Spark, avec Apache Kafka, est capable de synchroniser les données de ma base de données relationnelle avec Elasticsearch.Rejoindre des données de diffusion dans Apache Spark

Mon plan consiste à utiliser l'un des connecteurs Kafka pour lire les données du SGBDR et les insérer dans les rubriques Kafka. Ce serait la DRE du modèle et du DDL. Tout à fait de base, Report et Product tables qui ont beaucoup à plusieurs rapports qui existent dans ReportProduct tableau: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 

Mon but est de le transformer en document avec la structure suivante:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

J'ai été capable de former un tel type de structure en utilisant des données statiques que j'ai mockées localement:

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 

Mais je me rends compte que c'est trop basique et les flux vont être beaucoup plus compliqués.

Les données changent de manière irrégulière, les rapports et leurs produits changent constamment, les produits sont changés de temps en temps (la plupart du temps sur une base hebdomadaire).

Je souhaite répliquer tout type de modification dans Elasticsearch qui s'est produite dans l'une de ces tables.

Répondre

1
  1. Kafka Connect pour tirer les données de votre source DB - vous pouvez utiliser la JDBC Source qui est disponible dans le cadre de Confluent Platform (ou separately), et peut aussi vouloir enquêter sur kafka-connect-cdc-mssql

  2. Une fois que vous Si vous avez les données dans Kafka, utilisez soit le Kafka Streams API pour manipuler les données comme vous le souhaitez, ou regardez la nouvelle publication KSQL. Le choix dépendra de votre préférence pour le codage en Java (avec Kafka Streams) ou la manipulation de données dans un environnement de type SQL (avec KSQL). Peu importe, la sortie de ces deux sera un autre sujet de Kafka.

  3. Enfin, STREAM le sujet Kafka ci-dessus en utilisant le ElasticSearch ElasticSearch Kafka Connect plugin (disponible here, ou dans le cadre de la Confluent Platform)

+0

Cela sonne vraiment bien. D'après les recherches que j'ai faites auparavant, Kafka ne vous permet pas de rejoindre des clés non partagées, ce qui pourrait être le cas pour moi. KSQL résout-il cela? –

+0

Vous pouvez répartir facilement en utilisant KSQL, ce qui, je pense, contournerait ce problème. Je n'ai pas essayé cependant. –