2016-06-08 3 views
4

Ma pile est uwsgi avec gevents. J'essaye d'enrouler mes points d'extrémité d'api avec un décorateur pour pousser toutes les données de demande (URL, méthode, corps et réponse) à un sujet de kafka, mais cela ne fonctionne pas. Ma théorie est parce que j'utilise gevents, et j'essaye de les exécuter en mode asynchrone, le fil asynchrone qui pousse réellement à kafka, n'est pas capable de fonctionner avec des gevents. Et si j'essaie de synchroniser la méthode, alors cela ne marche pas non plus, il meurt dans l'agent de production, c'est-à-dire après production, l'appel ne revient jamais. Bien que les deux méthodes fonctionnent bien sur shell python et si je lance uwsgi sur les threads.Comment faire fonctionner kafka-python ou pykafka en tant que producteur async avec uwsgi et gevent?

suit le code d'échantillon: 1. avec kafka-python (async)

try: 
     kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(',')) 
    except NoBrokersAvailable: 
     logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST)) 
     kafka_producer = None 


    def send_message_to_kafka(topic, key, message): 
     """ 
     :param topic: topic name 
     :param key: key to decide partition 
     :param message: json serializable object to send 
     :return: 
     """ 
     if not kafka_producer: 
      logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST)) 
      return 
     data = json.dumps(message) 
     try: 
      start = time.time() 
      kafka_producer.send(topic, key=str(key), value=data) 
      logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
     except KafkaTimeoutError as e: 
      logger.info(u'Message not sent: {}'.format(KAFKAHOST)) 
      logger.info(e) 
      pass 
     except Exception as e: 
      logger.info(u'Message not sent: {}'.format(KAFKAHOST)) 
      logger.exception(e) 
      pass 
  1. avec py-kafka (sync):

    try: 
        client = KafkaClient(hosts=KAFKAHOST) 
    except Exception as e: 
        logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST)) 
        client = None 
    
    
    def send_message_to_kafka(topic, key, message): 
        """ 
        :param topic: topic name 
        :param key: key to decide partition 
        :param message: json serializable object to send 
        :return: 
        """ 
        if not client: 
         logger.info(u'Kafka Host is None') 
         return 
        data = json.dumps(message) 
        try: 
         start = time.time() 
         topic = client.topics[topic] 
         with topic.get_sync_producer() as producer: 
          producer.produce(data, partition_key='{}'.format(key)) 
         logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
        except Exception as e: 
         logger.exception(e) 
         pass 
    
+0

des nouvelles de kafka-python? –

Répondre

4

J'ai plus d'expérience avec pykafka afin que je puisse répondre à cette section. pykafka utilise un gestionnaire de threads enfichable et la prise en charge de gevent est intégrée. Vous devez instancier le KafkaClient avec use_greenlets=True. Docs here

Autres réflexions sur votre approche. La création d'un nouvel objet de sujet et d'un producteur pour chaque message est extrêmement coûteuse. Il vaut mieux créer une fois et réutiliser.

# setup once 
client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True) 
topic = client.topics[topic] 
producer = topic.get_sync_producer() 

def send_message_to_kafka(producer, key, message): 
    """ 
    :param producer: pykafka producer 
    :param key: key to decide partition 
    :param message: json serializable object to send 
    :return: 
    """ 

    data = json.dumps(message) 
    try: 
     start = time.time() 
     producer.produce(data, partition_key='{}'.format(key)) 
     logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
    except Exception as e: 
     logger.exception(e) 
     pass # for at least once delivery you will need to catch network errors and retry. 

Enfin, Kafka tire parti de sa vitesse de mise en lot et de compression. L'utilisation du producteur de synchronisation empêche le client d'exploiter ces fonctionnalités. Cela fonctionnera, mais il est plus lent et utilise plus d'espace. Certaines applications nécessitent une synchronisation, mais il peut être judicieux de repenser votre application aux messages par lots si vous rencontrez des problèmes de performances.

+0

Merci, vous m'avez sauvé la vie ... trois jours je luttais avec kafka-python et j'ai décidé de passer à pykafka et use_greenlets –