Ma pile est uwsgi avec gevents. J'essaye d'enrouler mes points d'extrémité d'api avec un décorateur pour pousser toutes les données de demande (URL, méthode, corps et réponse) à un sujet de kafka, mais cela ne fonctionne pas. Ma théorie est parce que j'utilise gevents, et j'essaye de les exécuter en mode asynchrone, le fil asynchrone qui pousse réellement à kafka, n'est pas capable de fonctionner avec des gevents. Et si j'essaie de synchroniser la méthode, alors cela ne marche pas non plus, il meurt dans l'agent de production, c'est-à-dire après production, l'appel ne revient jamais. Bien que les deux méthodes fonctionnent bien sur shell python et si je lance uwsgi sur les threads.Comment faire fonctionner kafka-python ou pykafka en tant que producteur async avec uwsgi et gevent?
suit le code d'échantillon: 1. avec kafka-python (async)
try:
kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
except NoBrokersAvailable:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
kafka_producer = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not kafka_producer:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
return
data = json.dumps(message)
try:
start = time.time()
kafka_producer.send(topic, key=str(key), value=data)
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except KafkaTimeoutError as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.info(e)
pass
except Exception as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.exception(e)
pass
avec py-kafka (sync):
try: client = KafkaClient(hosts=KAFKAHOST) except Exception as e: logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST)) client = None def send_message_to_kafka(topic, key, message): """ :param topic: topic name :param key: key to decide partition :param message: json serializable object to send :return: """ if not client: logger.info(u'Kafka Host is None') return data = json.dumps(message) try: start = time.time() topic = client.topics[topic] with topic.get_sync_producer() as producer: producer.produce(data, partition_key='{}'.format(key)) logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) except Exception as e: logger.exception(e) pass
des nouvelles de kafka-python? –