Excuses si le titre est trop vague, mais j'ai eu du mal à le formuler correctement. En gros, j'essaie de savoir si Apache Spark, avec Apache Kafka, est capable de synchroniser les données de ma base de données relationnelle avec Elasticsearch.Rejoindre des données de diffusion dans Apache Spark
Mon plan consiste à utiliser l'un des connecteurs Kafka pour lire les données du SGBDR et les insérer dans les rubriques Kafka. Ce serait la DRE du modèle et du DDL. Tout à fait de base, Report
et Product
tables qui ont beaucoup à plusieurs rapports qui existent dans ReportProduct
tableau:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
Mon but est de le transformer en document avec la structure suivante:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
J'ai été capable de former un tel type de structure en utilisant des données statiques que j'ai mockées localement:
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
Mais je me rends compte que c'est trop basique et les flux vont être beaucoup plus compliqués.
Les données changent de manière irrégulière, les rapports et leurs produits changent constamment, les produits sont changés de temps en temps (la plupart du temps sur une base hebdomadaire).
Je souhaite répliquer tout type de modification dans Elasticsearch qui s'est produite dans l'une de ces tables.
Cela sonne vraiment bien. D'après les recherches que j'ai faites auparavant, Kafka ne vous permet pas de rejoindre des clés non partagées, ce qui pourrait être le cas pour moi. KSQL résout-il cela? –
Vous pouvez répartir facilement en utilisant KSQL, ce qui, je pense, contournerait ce problème. Je n'ai pas essayé cependant. –