Vous pouvez avoir des dépendances java api et maven (kafka et zookeeper) pour créer un sujet kafka comme ci-dessous. Vous pouvez appeler le code à partir du code dans lequel vous envoyez une demande d'étincelle.
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import kafka.utils.ZKStringSerializer$;
import kafka.admin.AdminUtils;
public final class KafkaUtils {
public static void main(String[] args) throws Exception {
KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());
}
public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) {
ZkClient zkClient = null;
try {
zkClient = getZkClient(zkHosts);
AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties);
} catch (Exception exception) {
exception.printStackTrace();
} finally {
if (zkClient != null) {
try {
zkClient.close();
} catch (ZkInterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private static ZkClient getZkClient(String zkHosts) {
ZkClient zkClient = null;
// Zookeeper sessionTimeoutMs
final int sessionTimeoutMs = 10000;
// Zookeeper connectionTimeoutMs
final int connectionTimeoutMs = 10000;
zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
return zkClient;
}
}
Ici x.x.x.x et y.y.y.y sont les hôtes du cluster ZK pour kafka. J'espère que cela t'aides.
Cela aide beaucoup. Merci. Mais est-ce que je peux invoquer n'importe quel moyen en utilisant des scripts shell? like /kafka-topics.sh --zookeeper --top Du cluster spark ... Je ne suis pas sûr de SSH car les deux sont dans un cluster différent .. –
Aru
S'il y a une connectivité n/w entre spark et kafka et son zookeeper (2181 ports ouverts) clusters nœuds, alors une autre solution simple peut être est de télécharger le binaire kafka de http://kafka.apache.org/downloads.html et d'extraire sur l'un des nœuds du cluster spark et aller à KAFKA_HOME et utilisez la commande ci-dessous, qui peut faire partie de votre script shell. –
bin/kafka-topics.sh --zookeeper xxxx: 2181, aaaa: 2181 --créer --topic topicName --partitions 1 --replication-facteur 1 –