2017-09-15 10 views
0

J'ai un graveur foreach personnalisé pour le streaming Spark. Pour chaque ligne j'écris à la source JDBC. Je veux aussi faire un peu de recherche rapide avant d'effectuer une opération JDBC et mettre à jour la valeur après avoir effectué des opérations JDBC, comme "Step-1" et "Step-3" dans l'exemple ci-dessous ...Base de données en mémoire permanente dans Apache Spark

ne veulent pas utiliser des bases de données externes comme REDIS, MongoDB. Je veux quelque chose avec impression faible pied comme RocksDB, Derby, etc ...

Je suis d'accord avec le stockage d'un fichier par application, comme points de reprise, je vais créer un dossier interne-db ...

Je ne voyais pas en mémoire DB pour Spark ..

def main(args: Array[String]): Unit = { 

val brokers = "quickstart:9092" 
val topic = "safe_message_landing_app_4" 

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate(); 

val sparkContext = sparkSession.sparkContext; 
sparkContext.setLogLevel("ERROR") 
val sqlContext = sparkSession.sqlContext; 

val kafkaDataframe = sparkSession.readStream.format("kafka") 
    .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic, 
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader")) 
    .load() 

kafkaDataframe.printSchema() 
kafkaDataframe.createOrReplaceTempView("kafka_view") 
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view") 

val customForEachWriter = new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long) = { 
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version) 
    true 
    } 

    override def process(value: Row) = { 
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store 

    // JDBC operations 

    // Step 3 ==> Update the value in persistent KEY-VALUE store 
    } 

    override def close(errorOrNull: Throwable) = { 
    println(" ************** Closed ****************** ") 
    } 
} 

val yy = sqlDataframe 
    .writeStream 
    .queryName("foreachquery") 
    .foreach(customForEachWriter) 
    .start() 

yy.awaitTermination() 

sparkSession.close(); 

}

+1

Demandez-vous à propos de https: //db.apache. org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html? Je ne sais pas vraiment ce qu'est une "base de données persistante en mémoire", sauf si vous parlez d'utiliser du matériel comme NVM? Sans matériel spécial, une base de données en mémoire Derby n'est PAS durable. –

+1

Ce que je voulais dire en -mémoire signifiait ... mysql, redis fonctionne comme un processus séparé ... ce que je ne veux pas ... derby se charge dans les programmes de pilotes spark et des exécuteurs je veux me connecter au derby ... coz mon étincelle le travail qui est exécuté par le fil sera sur 5 machines .. donc je peux utiliser derby est étincelle ... et cela fonctionnera pour mon besoin étape 1 et 3 ... mais ne supporte pas MVCC donc je pense à la base de données H2 ... donc je veux faire l'expérience de l'utilisation de Derby et H2 est Spark – Manjesh

+2

OK. Le terme de Derby pour ce moteur de base de données "in-process" est "embedded", et oui cela fonctionne bien pour intégrer Derby dans une autre application (Java). Vous avez raison: Derby n'est pas un moteur de base de données MVCC. Pour commencer avec Derby, je recommande les tutoriels à: https://db.apache.org/derby/docs/10.13/getstart/ –

Répondre

1

Manjesh,

Ce que vous cherchez, « Spark et votre en mémoire DB comme un cluster sans soudure, partage d'un seul espace de processus ", avec le support de MVCC est exactement ce que SnappyData fournit. Avec SnappyData, les tables sur lesquelles vous souhaitez rechercher rapidement se trouvent dans le même processus que votre travail de diffusion en continu Spark. Check it out here

SnappyData possède une licence Apache V2 pour le produit principal et l'utilisation spécifique à laquelle vous faites référence est disponible dans le téléchargement OSS.

(Divulgation: Je suis un employé SnappyData et il est logique de fournir une réponse spécifique du produit à cette question parce que le produit est la réponse à la question)