2017-09-01 5 views
1

J'ai besoin de vider périodiquement la sortie du client kafka dans un fichier Excel. J'utilise le code suivant:Dump le kafka (kafka-python) dans un fichier txt

from kafka import KafkaConsumer 
from kafka import KafkaProducer 
import json,time 
from xlutils.copy import copy  
from xlrd import open_workbook 
import pandas 

consumer = KafkaConsumer(bootstrap_servers='localhost:9092') 
KafkaConsumer() 
consumer.subscribe("test") 

rowx=0 
colx=0 

for msg in consumer: 
     book_ro = open_workbook("twitter.xls") 
     book = copy(book_ro) # creates a writeable copy 
     sheet1 = book.get_sheet(0) # get a first sheet 
     sheet1.write(rowx,colx, msg[6]) 
     book.save("twitter.xls") 

Maintenant, mon problème est que le code n'est pas efficace. Pour chaque message, j'ai besoin d'ouvrir, d'écrire, puis de sauvegarder le fichier Excel. Y at-il une approche pour ouvrir l'Excel une fois, écrire, puis le fermer (pour un lot de messages et pas dans la boucle for)? tnx

+0

Pourquoi fermer le fichier? –

Répondre

0

Oui, ouvrir, écrire, enregistrer et fermer avec chaque message est inefficace, vous pouvez le faire dans un lot. Mais encore besoin de le faire en consommant une boucle.

msg_buffer = [] 
buffer_size = 100 
for msg in consumer: 
     msg_buffer.append(msg[6]) 
     if len(msg_buffer) >= buffer_size: 
      book_ro = open_workbook("twitter.xls") 
      book = copy(book_ro) # creates a writeable copy 
      for _msg in msg_buffer: 
       sheet1 = book.get_sheet(0) # get a first sheet 
       sheet1.write(rowx,colx, _msg) 
      book.save("twitter.xls") 
      msg_buffer = [] 

Vous pourriez penser que ce sera 100 fois plus rapide que nobatch.

Mise à jour pour commentaire:

Oui, d'habitude, nous allons rester dans cette boucle pour toujours, il utilise en interne sondage pour aller chercher le nouveau message, envoyer des battements de cœur et COMMIT offset. Et si votre but est de consommer des messages de ce sujet et d'enregistrer des messages, cela devrait être une boucle longue.

Ceci est de conception kafka-python, vous devriez utiliser comme ceci pour consommer un message ou utiliser le consumer.poll().

Quant à savoir pourquoi vous pouvez utiliser for msg in consumer:, parce que le consommateur est un objet iterator, sa classe implémente l'__iter__ et __next__, il sous-jacente utilise un outil de récupération pour récupérer les enregistrements. plus de détails de mise en œuvre que vous pourriez vous référer https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py

+0

Merci pour la note. Je me demande si nous restons dans le "pour msg dans la boucle de consommation" pour toujours? Dans ce cas, votre code est OK. ou existe-t-il une fonction de rappel (quand le client reçoit vraiment un message, il appelle une fonction pour faire sth)? Quel est le type d'objet consommateur? est-ce une liste ou quoi? – user2867237

+0

répondre à la question. – GuangshengZuo