2017-05-15 1 views
0

Sur un serveur Ubuntu, j'ai configuré Divolte Collector pour collecter des données de navigation à partir de sites Web. Les données sont écrites sur un canal Kafka appelé divolte-data. En mettant en place un consommateur Kafka, je vois les données à venir dans:Comment lire le canal Kafka de divolte-data avec Druid-Tranquility (pour Superset)?

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer 
LinuxCanonical Ltd. 

Je voudrais visualiser les données avec Superset qui a Airbnb plusieurs connecteurs à des bases de données communes, y compris druid.io (qui peut lire Spark).

Il semble que Divolte stocke les données dans Kafka de manière non structurée. Mais apparemment, il peut cartographier les données de manière structurée. Les données d'entrée doivent-elles être structurées en JSON (comme le dit la documentation)?

Et puis comment lire à partir de Druid-Tranquility les données reçues au canal Kafka divolte-data? J'ai essayé de changer le nom du canal dans les exemples de conf mais ce consommateur reçoit alors le message zéro.

Répondre

0

La solution que j'ai trouvée est que je peux traiter les messages Kafka en Python, par exemple avec la librairie Kafka Python, ou Confluent Kafka Python, et ensuite je vais décoder les messages avec les lecteurs Avro.

Edit: Voici une mise à jour sur la façon dont je l'ai fait:

Je pensais que la bibliothèque Avro était juste pour lire les fichiers Avro, mais il résolu en fait le problème de décodage des messages Kafka, comme suit: Je d'abord importer les bibliothèques et donne le fichier de schéma en tant que paramètre, puis crée une fonction pour décoder le message dans un dictionnaire, que je peux utiliser dans la boucle consommateur.

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False