2014-06-16 3 views
4

J'ai un programme Python 2.7 qui extrait les données des sites Web et les enregistre dans une base de données. Il suit le modèle de producteur de consommation et est écrit en utilisant le module de filetage.Récupération de données avec l'asyncio de Python dans un ordre séquentiel

Juste pour le plaisir, je voudrais réécrire ce programme en utilisant le nouveau module asyncio (à partir de 3.4), mais je ne peux pas comprendre comment faire correctement.

L'exigence la plus cruciale est que le programme doit extraire les données du même site Web dans un ordre séquentiel. Par exemple, pour une url « http://a-restaurant.com » il faut d'abord obtenir « http://a-restaurant.com/menu/0 », puis « http://a-restaurant.com/menu/1 », puis« http://a-restaurant.com/menu/2 » ... Si elles ne sont pas récupérés pour le site Web cesse de pages livrer tout à fait et vous devez commencer à 0.

Cependant fetch pour un autre site (« http://another-restaurant.com ») peut (et doit) exécuter en même temps (les autres sites ont également la restriction sequantial).

Le module d'enfilage convient bien pour cela car je peux créer des threads séparés pour chaque site web et dans chaque thread il peut attendre jusqu'à ce qu'une page ait fini de charger avant d'aller en chercher une autre.

Voici un extrait de code grossièrement simplifiée de la version de filetage (Python 2.7):

class FetchThread(threading.Threading) 
    def __init__(self, queue, url) 
     self.queue = queue 
     self.baseurl = url 
    ... 
    def run(self) 
     # Get 10 menu pages in a sequantial order 
     for food in range(10): 
      url = self.baseurl + '/' + str(food) 
      text = urllib2.urlopen(url).read() 
      self.queue.put(text) 
      ... 
def main() 
    queue = Queue.Queue() 
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
    for url in urls: 
     fetcher = FetchThread(queue, url) 
     fetcher.start() 
     ... 

Et voici comment j'ai essayé de le faire avec asyncio (3.4.1):

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 


l = [] 
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
for url in urls: 
    for food in range(10): 
     menu_url = url + '/' + str(food) 
     l.append(print_page(menu_url)) 

loop.run_until_complete(asyncio.wait(l)) 

Et il récupère et imprime tout dans un ordre non séquentiel. Eh bien, je suppose que c'est l'idée de ces coroutines. Ne devrais-je pas utiliser aiohttp et juste aller chercher avec urllib? Mais est-ce que les récupérations pour le premier restaurant bloquent alors les récupérations pour les autres restaurants? Suis-je juste en train de penser que c'est complètement faux? (Ceci est juste un test pour essayer d'aller chercher les choses dans un ordre séquentiel.)

Répondre

5

Votre code actuel fonctionnera bien pour le restaurant qui ne se soucie pas de la commande séquentielle des demandes . Les dix demandes du menu s'exécuteront simultanément et seront imprimées sur stdout dès qu'elles seront terminées.

Évidemment, cela ne fonctionnera pas pour le restaurant qui nécessite des demandes séquentielles. Vous devez factoriser un peu pour cela au travail:

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 

@syncio.coroutine 
def print_pages_sequential(url, num_pages): 
    for food in range(num_pages): 
     menu_url = url + '/' + str(food) 
     yield from print_page(menu_url) 

l = [print_pages_sequential('http://a-restaurant.com/menu', 10)] 

conc_url = 'http://another-restaurant.com/menu' 
for food in range(10): 
    menu_url = conc_url + '/' + str(food) 
    l.append(print_page(menu_url)) 

loop.run_until_complete(asyncio.wait(l)) 

Au lieu d'ajouter les dix demandes pour le restaurant séquentiel à la liste, nous ajoutons un coroutine à la liste qui itérer sur toutes les dix pages de manière séquentielle. La façon dont cela fonctionne est que yield from print_page arrêtera l'exécution de print_pages_sequential jusqu'à ce que la demande print_page soit terminée, mais elle le fera sans bloquer les autres coroutines qui s'exécutent simultanément (comme tous les appels print_page que vous ajoutez à l). En faisant cela, toutes vos demandes «d'un autre restaurant» peuvent se dérouler simultanément, comme vous le souhaitez, et vos demandes de «restaurant» se dérouleront de manière séquentielle, mais sans bloquer les «autres». restaurant "demandes.

Edit:

Si tous les sites ont la même exigence séquentielle fetching, la logique peut être simplifiée plus:

l = [] 
urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"] 
for url in urls: 
    menu_url = url + '/' + str(food) 
    l.append(print_page_sequential(menu_url, 10)) 

loop.run_until_complete(asyncio.wait(l)) 
+0

Merci @dano. Pour être clair: tous les restaurants ont besoin d'aller chercher séquentiellement dans leurs menus, mais je voudrais aller chercher les données du premier restaurant et du second restaurant en même temps (juste leurs fetchs de menu respectifs doivent être séquentiels). Donc je suppose que la solution est de 'l = [print_pages_sequential ('http://a-restaurant.com/menu', 10), print_pages_sequential ('http://another-restaurant.com/menu', 10)]' puis exécutez 'loop.run_until_complete (asyncio.wait (l))' (Impossible de tester cela maintenant.) – mat

+1

@ user3313978 Ah, désolé, j'ai mal compris cette exigence. Votre hypothèse sur la solution étant donné que la contrainte est correcte. J'ai mis à jour ma réponse pour refléter la nouvelle contrainte. – dano

+0

Cela ne démarre toujours pas les requêtes dans l'ordre, @dano. Malheureusement, 'gather' et' wait' programment toutes les coroutines passées dans un ordre non-déterministe car elles les enveloppent dans 'Task's. Voir [asyncio numéro 432] (https://github.com/python/asyncio/issues/432). Une solution de contournement consiste à attribuer manuellement une tâche de boucle à chacun de vos objets coroutine avant de les transmettre à 'gather' ou' wait'. par exemple. 'l.append (loop.create_task (print_page_sequential (menu_url, 10)))' ' –

2

asyncio.Task est remplacement pour threading.Thread dans asyncio monde. asyncio.async crée également une nouvelle tâche.

asyncio.gather est très pratique d'attendre plusieurs coroutines, je le préfère au lieu de asyncio.wait.

@asyncio.coroutine 
def fetch(url): 
    response = yield from aiohttp.request('GET', url) 
    response = yield from response.read_and_close() 
    return response.decode('utf-8') 

@asyncio.coroutine 
def print_page(url): 
    page = yield from fetch(url) 
    print(page) 

@asyncio.coroutine 
def process_restaurant(url): 
    for food in range(10): 
     menu_url = url + '/' + str(food) 
     yield from print_page(menu_url) 

urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu') 
coros = [] 
for url in urls: 
    coros.append(asyncio.Task(process_restaurant(url))) 

loop.run_until_complete(asyncio.gather(*coros)) 
+0

Bon à savoir. 'asyncio' semble un peu plus complexe que prévu. BTW, 'def process_restaurant (url)' manque un niveau d'indentation. – mat

+0

Le balisage pour 'process_restaurant' est corrigé. Merci pour le rapport. –

Questions connexes