2015-10-02 3 views
1

Les propriétés de configuration de Kakfa ont donc été modifiées pour les producteurs dans l'API 0.8.2; Après avoir travaillé dessus et avoir compilé Java, je reçois une exception. Le producteur vise à mes nœuds de grappe Kafka_2.9.1-0.8.2.1, et je reçois cette exception sur le DefaultSerializer pas instanciation:Kafka 2.9.1 producteur 0.8.2.1 compiler vs dépendances d'exécution

Exception in thread "main" org.apache.kafka.common.KafkaException: Could not instantiate class kafka.serializer.DefaultEncoder Does it have a public no-argument constructor? 
     at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:235) 
     at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:136) 
     at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:216) 
........ 

Compte tenu de cela est mis en œuvre Kakfa Je me demande si la compilation avec Kafka comme la dépendance était pas suffisant car peut-être que je devais emballer dans un ou plusieurs pots Kafka lors de la course. Je n'ai pas trouvé de documentation (à jour ou autre) à ce sujet. Y a-t-il un pot d'exécution pour le producteur qui me manque?

Pour référence, j'inclus mon build.gradle ici (c'est un peu brouillon). Les exclusions dans la compilation étaient un nouvel ajout après avoir déjà obtenu cette erreur, donc l'erreur se produit avec ou sans ces lignes dans le bloc de dépendances. J'ai essayé seulement de compter sur le module kafka-client pour 0.8.2 mais je ne pense pas que cela fonctionne pour un producteur. Voici le fichier:

buildscript { 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath 'com.google.protobuf:protobuf-gradle-plugin:0.7.0' 
    } 
} 

group 'lamblin' 
version '0.1-SNAPSHOT' 

apply plugin: 'application' 
apply plugin: "com.google.protobuf" 

sourceCompatibility = 1.7 
targetCompatibility = 1.7 

// Eliminates bootstrap class warning from javac 
//tasks.withType(Compile) { 
// options.bootClasspath = "$JDK6_HOME/jre/lib/rt.jar" 
//} 

repositories { 
    mavenCentral() 
} 

dependencies { 
    testCompile group: 'junit', name: 'junit', version: '4.11' 
    compile group: 'com.google.guava', name: 'guava', version: '18.0' 
    compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.0.0-beta-1' 
    compile group: 'com.google.transit', name: 'gtfs-realtime-bindings', version: '0.0.4' 
    compile group: 'com.offbytwo', name: 'docopt', version: '0.6.0.20150202' 
    //compile group: 'org.apache.kafka', name: 'kafka_2.9.1', version: '0.8.2.1' { 
    compile ('org.apache.kafka:kafka_2.9.1:0.8.2.1') { 
     exclude group: 'com.sun.jmx', module: 'jmxri' 
     exclude group: 'javax.jmx', module: 'jms' 
     exclude group: 'com.sun.jdmk', module: 'jmxtools' 
    } 
} 

protobuf { 
    generateProtoTasks { 
     all().each { task -> 
      task.builtins { 
       python { } 
      } 
     } 
    } 
    protoc { 
     // artifact = 'com.google.protobuf:protoc:3.0.0-alpha-3' 
     artifact = 'com.google.protobuf:protoc:2.6.1' 
    } 
} 
// First Application Script 
mainClassName = "com.insight.lamblin.GtfsToJson" 
applicationName = "gtfsToJson" 

// Subsequent Scripts 
task createAllStartScripts() << { 
    // This task is added to by a loop over the scripts array creating-sub tasks 
} 
def scripts = [ 'gtfsToJson': 'com.insight.lamblin.GtfsToJson', 
       'rawGtfsKafkaProducer': 'com.insight.lamblin.RawGtfsKafkaProducer' 
] 
scripts.each() { scriptName, className -> 
    def t = tasks.create(name: scriptName+'StartScript', type: CreateStartScripts) { 
     mainClassName = className 
     applicationName = scriptName 
     outputDir = new File(project.buildDir, 'scripts') 
     classpath = jar.outputs.files + project.configurations.runtime 
    } 
    applicationDistribution.into("bin") { 
     from(t) 
     fileMode = 0755 
    } 
    createAllStartScripts.dependsOn(t) 
} 

Répondre

1

Scène: Dans un sous-sol de l'église indéfinissable un cercle de chaises pliantes en métal est occupé par une variété d'hommes d'âge moyen et quelques femmes, dont la plupart ont des lunettes et semblent désintéressés. Il y a une boîte de café et quelques demi-beignets disposés sur une plaque de plastique à leur tour sur une table pliée, non couverte, pliée par un mur adjacent à l'entrée. Daniel: Salut, je m'appelle Daniel, et je suis un ... (sanglot) ... écumeur de documentation.
Groupe (lentement): Bienvenue Daniel.

Ce morceau de jeu d'écran a été écrit parce qu'il semble que ma question de Kafka n'a attiré que des grillons, donc je le garde un peu intéressant ici ... se sentir seul. Pour ma défense, il ya 10 documents de kafka.apache.org qui semblent faire autorité sur les paramètres de propriété pour les producteurs. Le kafka.serializer.DefaultSerializer est très important et commun dans presque tous les exemples de propriétés de paramètre, et les exemples de producteur Java manquent complètement de détails sur les propriétés ou sur l'exécution de l'exemple de code.

De même, malgré le nom "default", il n'y a pas de valeur par défaut pour cette propriété, donc l'obligation de la définir. Cela semble être un détail stupide, mais cela doit avoir du sens pour quelqu'un de l'équipe de développement de Kafka. Lors de l'exécution d'un producteur Kafka écrit en Java, le producteur doit définir des encodeurs à partir des quelques codeurs spécifiques Java disponibles. Celui-ci semble être spécifique à Scala. Pour Java, vous êtes intéressé par org.apache.kafka.common.serialization et vous l'équivalent du sérialiseur par défaut est: ByteArraySerializer. Si vous définissez key.serializer et value.serializer cela devrait fonctionner. La meilleure façon de les définir est d'utiliser les chaînes statiques dans ProducerConfig comme ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG.

L'installation est un peu comme:

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
... 
import java.util.Properties; 
... 
Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "172.31.22.7:9092,172.31.22.6:9092,172.31.22.5:9092,172.31.22.4:9092"); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.ByteArraySerializer"); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      "org.apache.kafka.common.serialization.ByteArraySerializer"); 
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props); 
... 
+0

parfois .. Je viens de lire la section « Démarrage rapide » ... Je sais qu'il est mal ... ;-) –