2016-09-12 2 views
0

J'ai deux groupes fonctionnant Kafka et étincelle séparément. Je veux créer un kafka-topic à partir du cluster spark. J'ai remarqué pour créer un sujet que nous devons invoquer Kafka-topics.sh qui ne sera pas disponible dans le cluster spark. La commande devrait être invoquée via Shell.Comment créer un sujet Kafka s'exécutant dans un cluster différent d'un autre cluster spark?

par exemple: /kafka_topics.sh --zookeeper: 2181 --create --topic test_topic

Ce script doit être appelé à partir de grappes d'allumage et il devrait s'exécuter sur le cluster Kafka. Quelqu'un peut-il m'aider?

Répondre

1

Vous pouvez avoir des dépendances java api et maven (kafka et zookeeper) pour créer un sujet kafka comme ci-dessous. Vous pouvez appeler le code à partir du code dans lequel vous envoyez une demande d'étincelle.

<dependency> 
    <groupId>com.101tec</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.3</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 

import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.exception.ZkInterruptedException; 
import kafka.utils.ZKStringSerializer$; 
import kafka.admin.AdminUtils; 

public final class KafkaUtils { 
    public static void main(String[] args) throws Exception {  
     KafkaUtils.createTopic("x.x.x.x:2181,y.y.y.y:2181", "topicName", 1, 0, new Properties());  
    } 

    public static void createTopic(String zkHosts, String topicName, int numberOfPartition, int replicationFactor, Properties properties) { 
     ZkClient zkClient = null; 
     try { 
      zkClient = getZkClient(zkHosts); 
      AdminUtils.createTopic(zkClient, topicName, numberOfPartition, replicationFactor, properties); 
     } catch (Exception exception) { 
      exception.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       try { 
        zkClient.close(); 
       } catch (ZkInterruptedException ex) { 
        ex.printStackTrace(); 
       } 

      } 
     } 
    } 

    private static ZkClient getZkClient(String zkHosts) { 
     ZkClient zkClient = null; 
     // Zookeeper sessionTimeoutMs 
     final int sessionTimeoutMs = 10000; 
     // Zookeeper connectionTimeoutMs 
     final int connectionTimeoutMs = 10000; 
     zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); 
     return zkClient; 
    } 
} 

Ici x.x.x.x et y.y.y.y sont les hôtes du cluster ZK pour kafka. J'espère que cela t'aides.

+0

Cela aide beaucoup. Merci. Mais est-ce que je peux invoquer n'importe quel moyen en utilisant des scripts shell? like /kafka-topics.sh --zookeeper --top Du cluster spark ... Je ne suis pas sûr de SSH car les deux sont dans un cluster différent .. – Aru

+0

S'il y a une connectivité n/w entre spark et kafka et son zookeeper (2181 ports ouverts) clusters nœuds, alors une autre solution simple peut être est de télécharger le binaire kafka de http://kafka.apache.org/downloads.html et d'extraire sur l'un des nœuds du cluster spark et aller à KAFKA_HOME et utilisez la commande ci-dessous, qui peut faire partie de votre script shell. –

+0

bin/kafka-topics.sh --zookeeper xxxx: 2181, aaaa: 2181 --créer --topic topicName --partitions 1 --replication-facteur 1 –