2016-02-12 3 views
0

Je suis novice dans ElasticSearch, et j'essaie d'ajouter des entrées à un index dans ElasticSearch en utilisant des connexions simultanées du ElasticSearch ConnectionPool [via le Transport class].ElasticSearch ConnectionPool utilisant la bibliothèque elasticsearch-py

Voici mon code:

import elasticsearch 
from elasticsearch.transport import Transport 

def init_connection(): 
    transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False) 
    transport.add_connection(host=SERVER_URL+SERVER_PORT) 
    return transport 

def add_entries_to_es(id, name): 
    transport = init_connection() 
    doc = { 
      'name': name, 
      'postDate': datetime.datetime.now(), 
      'valid': "true", 
      'suggest': { 
       "input": name, 
       'output': name, 
       'payload': {'domain_id': id} 
       } 
      } 
    conn = transport.getConnection() 
    es = elasticsearch.Elasticsearch(connection_class=conn) 
    res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc) 
    ... 

Et je reçois l'erreur suivante:

File "/my_project/elastichelper.py", line 23, in init_connection 
transport.add_connection(host=SERVER_URL+SERVER_PORT) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection 
self.set_connections(self.hosts) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections 
connections = map(_create_connection, hosts) 
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection 
kwargs.update(host) 
ValueError: dictionary update sequence element #0 has length 1; 2 is required 

Je ne sais pas si Transport class est la bonne façon de instancier un ConnectionPool dans ElasticSearch. Cependant, j'ai lu des documents que Transport class gère l'instanciation des connexions individuelles ainsi que la création d'un pool de connexion pour les maintenir.

Je ne reçois pas la bonne façon d'instancier un ConnectionPool et utiliser les connexions de la piscine efficacement. Lire et googler n'a pas aidé en ma faveur.

Je suis également au courant de l'API helpers.bulk(), mais je suis confus à propos de l'utiliser car avec l'ajout d'entrées à l'index, je supprime également les entrées invalides.

Répondre

0

Je trouve que simplement en utilisant l'instance ElasticSearch classes avec une valeur timeout appropriée set [pour moi,timeout=30était assez bon] pour la méthode index travaillé. Comme ceci:

doc = { 
     'name': name, 
     'postDate': datetime.datetime.now(), 
     'valid': "true", 
     'suggest': { 
      "input": name, 
      'output': name, 
      'payload': {'domain_id': id} 
      } 
     } 
es = elasticsearch.Elasticsearch() 
res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30) 

J'avais d'abord fait face à des problèmes avec timeout simples ElasticSearch instance de classe, qui a été fixée par les changements ci-dessus.

Je n'ai pas besoin d'utiliser explicitement les instances de classe Transport ou Connection.