2017-09-14 4 views
1

J'apprends tout sur Apache Cassandra 3.x.x et j'essaye de développer des trucs pour jouer. Le problème est que je veux stocker des données dans une table Cassandra qui contient ces colonnes:Conversion d'UnixTimestamp en TIMEUUID pour Cassandra

id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID) 

REQ_Timestamp a le moment où le message a quitté le client au niveau frontend. Par contre, le Timestamp est le moment où le message est finalement stocké à Cassandra. J'ai besoin des deux horodatages car je veux mesurer le temps qu'il faut pour gérer la requête depuis son origine jusqu'à ce que les données soient stockées en toute sécurité.

Créer le Now_Timestamp est facile, je viens d'utiliser la fonction now() et il génère automatiquement le TIMEUUID. Le problème se pose avec REQ_Timestamp. Comment puis-je convertir cet horodatage Unix en TIMEUUID afin que Cassandra puisse le stocker? Est-ce seulement possible?

L'architecture de mon backend est la suivante: je reçois les données dans un JSON du frontend à un service web qui le traite et le stocke dans Kafka. Ensuite, un travail Spark Streaming prend ce journal Kafka et le met à Cassandra.

Ceci est mon WebService qui met les données dans Kafka.

@Path("/") 
public class MemoIn { 

    @POST 
    @Path("/in") 
    @Consumes(MediaType.APPLICATION_JSON) 
    @Produces(MediaType.TEXT_PLAIN) 
    public Response goInKafka(InputStream incomingData){ 
     StringBuilder bld = new StringBuilder(); 
     try { 
      BufferedReader in = new BufferedReader(new InputStreamReader(incomingData)); 
      String line = null; 
      while ((line = in.readLine()) != null) { 
       bld.append(line); 
      } 
     } catch (Exception e) { 
      System.out.println("Error Parsing: - "); 
     } 
     System.out.println("Data Received: " + bld.toString()); 

     JSONObject obj = new JSONObject(bld.toString()); 
     String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") + 
           "|" + obj.getString("id_diseased") 
           + "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp"); 

     try { 
      KafkaLogWriter.addToLog(line); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Response.status(200).entity(line).build(); 
    } 


} 

Voici mon Kafka Writer

package main.java.vcemetery.webservice; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer; 

public class KafkaLogWriter { 

    public static void addToLog(String memo)throws Exception { 
     // private static Scanner in; 
      String topicName = "MemosLog"; 

      /* 
      First, we set the properties of the Kafka Log 
      */ 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("acks", "all"); 
      props.put("retries", 0); 
      props.put("batch.size", 16384); 
      props.put("linger.ms", 1); 
      props.put("buffer.memory", 33554432); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

      // We create the producer 
      Producer<String, String> producer = new KafkaProducer<>(props); 
      // We send the line into the producer 
      producer.send(new ProducerRecord<>(topicName, memo)); 
      // We close the producer 
      producer.close(); 

    } 
} 

Et enfin voici ce que j'ai de mon Spark streaming emploi

public class MemoStream { 

    public static void main(String[] args) throws Exception { 
     Logger.getLogger("org").setLevel(Level.ERROR); 
     Logger.getLogger("akka").setLevel(Level.ERROR); 

     // Create the context with a 1 second batch size 
     SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); 

     Map<String, Object> kafkaParams = new HashMap<>(); 
     kafkaParams.put("bootstrap.servers", "localhost:9092"); 
     kafkaParams.put("key.deserializer", StringDeserializer.class); 
     kafkaParams.put("value.deserializer", StringDeserializer.class); 
     kafkaParams.put("group.id", "group1"); 
     kafkaParams.put("auto.offset.reset", "latest"); 
     kafkaParams.put("enable.auto.commit", false); 

     /* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */ 
     Collection<String> topics = Arrays.asList("MemosLog"); 

     final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = 
       KafkaUtils.createDirectStream(
         ssc, 
         LocationStrategies.PreferConsistent(), 
         ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
       ); 

     kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); 
     // Split each bucket of kafka data into memos a splitable stream 
     JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString())); 
     // Then, we split each stream into lines or memos 
     JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator()); 
     /* 
     To split each memo into sections of ids and messages, we have to use the code \\ plus the character 
      */ 
     JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator()); 
     sections.print(); 
     sections.foreachRDD(rdd -> { 
      rdd.foreachPartition(partitionOfRecords -> { 
       //We establish the connection with Cassandra 
       Cluster cluster = null; 
       try { 
        cluster = Cluster.builder() 
          .withClusterName("VCemeteryMemos") // ClusterName 
          .addContactPoint("127.0.0.1") // Host IP 
          .build(); 

       } finally { 
        if (cluster != null) cluster.close(); 
       } 
       while(partitionOfRecords.hasNext()){ 


       } 
      }); 
     }); 

     ssc.start(); 
     ssc.awaitTermination(); 

    } 
} 

Merci à l'avance.

Répondre

1

Cassandra n'a pas de fonction à convertir de Horodateur UNIX. Vous devez effectuer la conversion côté client.

Ref: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html

+0

je référencement en fait le même document lorsque je posais la question. Des idées sur la façon de faire la conversion du côté client? Je suis coincé ici. –

+0

Dépend de quel client vous utilisez. Est-ce que c'est datastax javadriver? Peut-être que vous pouvez montrer une partie de votre code pour ce que vous faites déjà. –

+0

L'architecture de mon backend est la suivante: j'obtiens les données dans un JSON depuis l'interface vers un service web qui les traite et les stocke dans Kafka. Ensuite, un travail Spark Streaming prend ce journal Kafka et le met à Cassandra. Je vais éditer mon post original avec le code WebService/Kafka et le code Spark que j'ai écrit jusqu'ici. –