2017-02-11 1 views
0

Maintenant, je suis en train de se connecter à partir des messages MQTT à kafka (étincelle en train de diffuser dans kafka)Comment Conver les données de charge utile de MQTT type de chaîne de kafka

J'ai utilisé ce connecteur https://github.com/evokly/kafka-connect-mqtt

Et Spark-2.1. 0, Kafka - 0.10.1.1

Saprk sortie de streaming tels que cette

({"schema":{"type":"string","optional":false},"payload":"mqtt"},{"schema":{"type":"bytes","optional":false},"payload":"MTIzMTIz"}) 

et code du producteur

object mqttProducer { 
def main(args: Array[String]) { 
val brokerUrl = "tcp://ip" 
val topic = "mqtt" 
val msg = "123123" 

var client: MqttClient = null 

// Creating new persistence for mqtt client 
val persistence = new MqttDefaultFilePersistence("/tmp") 

try { 
    // mqtt client with specific url and client id 
    client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence) 

    client.connect() 

    val msgTopic = client.getTopic(topic) 
    val message = new MqttMessage(msg.getBytes("utf-8")) 

    while (true) { 
    msgTopic.publish(message) 
    println("Publishing Data, Topic : %s, Message : %s".format(msgTopic.getName, message)) 
    Thread.sleep(1000) 
    } 
} 

catch { 
    case e: MqttException => println("Exception Caught: " + e) 
} 

finally { 
    client.disconnect() 
} 

et étincelles en streaming code de la consommation de kafka

package hb.test1 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.KafkaUtils 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 

object test2 { 

    def main(args: Array[String]): Unit = { 

val sparkConf = new SparkConf().setAppName("app") 
val ssc = new StreamingContext(sparkConf, Seconds(1))  


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> 
    "servers ip", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "use_a_separate_group_id_for_each_stream", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val topics = Array("mqtt-kafka") 
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

val testStream = stream.map(x => (x.key, x.value)) 


testStream.print() 

ssc.start() 
ssc.awaitTermination() 
    } 
} 

Comment pourrais-je obtenir cordes de Byte? s'il vous plaît aider les gars

Répondre

1

Cette charge "MTIzMTIz" est la chaîne "123123" juste codé en base64. Si vous voulez simplement prendre la charge utile MQTT et l'envoyer à Kafka sans encodage base64, vous devez utiliser un ByteArrayConverter. Dans ma config pour le même connecteur mqtt je mis le convertisseur de valeur comme ceci:

« value.converter »: « io.confluent.connect.replicator.util.ByteArrayConverter »

Le ByteArrayConverter ci-dessus est livré avec le Confluent La distribution d'entreprise, mais il existe d'autres Kafka Connect ByteArrayConverters open source tels que celui inclus avec le connecteur qubole/streamx kafka-connect-s3.

https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java

Il est KIP-128 pour ajouter un ByteArrayConverter standard au cadre Kafka Connect

https://cwiki.apache.org/confluence/display/KAFKA/KIP-128%3A+Add+ByteArrayConverter+for+Kafka+Connect

MISE À JOUR: Kafka 0.11 est maintenant libéré et est livré avec un ByteArrayConverter. Configurez "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" et vous devriez obtenir la charge utile mqtt brute sans encodage Base64.