2017-08-25 4 views
0

Bonjour J'écris un petit trigger Cassandra qui envoie des informations à Kafka après l'avoir inséré dans une certaine table. Voici mon code de déclenchement:java.lang.NoClassDefFoundError lors de la création du trigger Cassandra

public class InsertDataTrigger implements ITrigger { 

    public Collection<Mutation> augment(Partition update) { 

     //checking if trigger works and some debug info; 
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
     System.out.println("Hello " + dateFormat.format(new Date())); 
     System.out.println("This Insert Data Trigger"); 
     System.out.println("default charset " + Charset.defaultCharset());  //IMPORTANT check if it's important 

     //here we're gonna build the message to kafka based on inserted data 
     try { 
      UnfilteredRowIterator it = update.unfilteredIterator(); 
      CFMetaData cfMetaData = update.metadata(); 

      System.out.println("PartitionKey " + new String(update.partitionKey().getKey().array())); 
      System.out.println("update.metadata().clusteringColumns().toString() " + update.metadata().clusteringColumns().toString()); 

      while (it.hasNext()) { 
       JSONObject message = new JSONObject(); 

       Unfiltered un = it.next(); 
       Clustering clt = (Clustering) un.clustering(); 

       message.put("partitionkey", new String(update.partitionKey().getKey().array())); 

       System.out.println("clt.toString(cfMetaData) " + clt.toString(cfMetaData)); 
       System.out.println("clt.getRawValues() " + new String(clt.getRawValues()[0].array())); 
       System.out.println("partition.columns().toString() " + update.columns().toString()); 

       message.put("datetime", new String(clt.getRawValues()[0].array())); 

       Iterator<Cell> cells = update.getRow(clt).cells().iterator(); 

       while (cells.hasNext()) { 
        Cell cell = cells.next(); 
        System.out.println("cell.column().name.toString() " + cell.column().name.toString()); 
        System.out.println("cell.toString()" + cell.toString()); 
        Double x = cell.value().getDouble(); 
        System.out.println("cell.value().getDouble() " + x); 
        //if(cell.column().name.toString() == "value") 
        System.out.println(x); 
        message.put(cell.column().name.toString(), x); 
        //else 
        // message.put(cell.column().name.toString(),cell.value().toString()); 
       } 
       System.out.println("un.toString()" + un.toString(cfMetaData)); 

       if (!message.isEmpty()) { 
        System.out.println(message.toString()); 

        //Sending data to kafka 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("acks", "all"); 
        props.put("retries", 0); 
        props.put("batch.size", 16384); 
        props.put("linger.ms", 1); 
        props.put("buffer.memory", 33554432); 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props); 
        producer.send(new ProducerRecord<>("test", message.toString()));//move topic name to some properties 
        producer.close(); 
       } 


      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Collections.emptyList(); 
    } } 

Et voici mon fichier pom:

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

    <modelVersion>4.0.0</modelVersion> 

    <groupId>io.github.carldata</groupId> 
    <artifactId>InsertDataTrigger</artifactId> 
    <version>1.0</version> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all --> 
     <dependency> 
      <groupId>org.apache.cassandra</groupId> 
      <artifactId>cassandra-all</artifactId> 
      <version>3.11.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 
    </dependencies> 

</project> 

Le projet se construit bien et crée un fichier jar, mais lorsque je tente de créer déclencheur dans cassandra il échoue à l'exception mentionnée .

Répondre

2

Il est très probable que le fichier kafka-clients ne figure pas dans le répertoire lib de Cassandra. À moins que votre projet n'inclue cette dépendance (c.-à-d. Construire un pot gras/uber).

Vous pouvez avoir des problèmes avec les conflits de dépendances dans le client kafka jar et les dépendances Cassandra. En particulier org.xerial.snappy snappy-java ont des versions différentes. Cela peut fonctionner mais c'est quelque chose à surveiller. Si c'est un problème, vous pouvez construire votre pot de clients Kafka avec ses dépendances ombragées afin qu'ils ne soient pas en conflit.

+0

Merci beaucoup J'ai créé uber jar et il a résolu le problème. – CodeDog