0

J'ai créé un JavaPairInputDStream et essayé d'enregistrer les données consommées dans les tables Cassandra. Mais face à des problèmes et ne savez pas comment commencer avec le code:Spark Streaming Enregistrement dans Cassandra Tableau

Voici le code que je l'ai écrit pour SparkStreaming:

package com.test.anna.KafkaSpark; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo; 
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*; 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Set; 
import java.util.function.Function; 

import com.datastax.driver.core.Session; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import com.datastax.spark.connector.cql.CassandraConnector; 
import com.datastax.spark.connector.japi.CassandraJavaUtil; 
import com.datastax.spark.connector.writer.RowWriterFactory; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 
import java.util.Map; 

public class SparkStreamingConsumer { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 
     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(20000)); 

     // TODO: processing pipeline 
     Map<String, String> kafkaParams = new HashMap(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     kafkaParams.put("zookeeper.connect","localhost:2181"); 
     Set<String> topics = Collections.singleton("test6"); 
     System.out.println("Size of topic--->>>>"+topics.size()); 
     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("Message Received "+rdd.values().take(1)); 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
       + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> System.out.println(record._2)); 
      }); 

     directKafkaStream.foreachRDD(rdd ->{    
      rdd.foreachPartition(item ->{ 
       while (item.hasNext()) {  


     ssc.start(); 
     ssc.awaitTermination(); 
    } 

} 

bien vouloir me faire savoir comment enregistrer ces données à la table Cassandra, petit morceau code aiderait beaucoup :)

Merci à l'avance.

+0

Les documents couvrent assez bien: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg

Répondre

0

Utilisez le connecteur étincelle cassandra de DataStax spark-cassandra et vous pouvez obtenir jarre d'ici spark-cassandra connector jar

Voici l'exemple de code

import com.datastax.driver.core.Session; 
import com.datastax.spark.connector.cql.CassandraConnector; 
import org.apache.spark.SparkConf; 

SparkConf conf = new SparkConf(); 
conf.setAppName(APP_NAME); 
conf.setMaster(NODE); 
conf.set("spark.cassandra.connection.host", CASSANDRA_HOST); 
conf.set("spark.cassandra.auth.username", CASSANDRA_USER); 
conf.set("spark.cassandra.auth.password", CASSANDRA_PASS); 

final JavaSparkContext jpc = new JavaSparkContext(conf); 
final CassandraConnector connector=CassandraConnector.apply(jpc.getConf()); 
final Session session=connector.openSession(); 

Vous pouvez utiliser cet objet Session pour l'enregistrement des données à un keyspace

+0

Ouais cette chose que j'ai utilisé. C'est une insertion simple dans les tables de Cassandra. Je cherche quelque chose où je peux ingérer directement les données provenant du streaming Spark à Cassandra. –