2009-10-21 8 views
3

Commençons par un exemple simple. Un flux de données HTTP est dans le format suivant:Traitement de flux asynchrone en Python

MESSAGE_LENGTH, 2 bytes 
MESSAGE_BODY, 
REPEAT... 

Actuellement, j'utilise urllib2 pour récupérer des données de transmission en continu de processus comme ci-dessous:

length = response.read(2) 
while True: 
    data = response.read(length) 
    DO DATA PROCESSING 

Il fonctionne, mais étant donné que tous les messages sont en taille de 50 -100 octets, la méthode ci-dessus limite la taille de la mémoire tampon à chaque lecture, ce qui peut nuire aux performances.

Est-il possible d'utiliser des threads séparés pour la récupération et le traitement des données?

Répondre

0

Oui, bien sûr, et il existe de nombreuses techniques différentes pour le faire. Vous finirez généralement par avoir un ensemble de processus qui récupère uniquement les données et augmente le nombre de processus dans ce pool jusqu'à ce que vous ayez plus ou moins de bande passante. Ces processus stockent les données quelque part, puis vous avez d'autres processus ou threads qui sélectionnent les données et les traitent depuis l'emplacement où elles sont stockées. Donc, la réponse à votre question est "Oui", votre prochaine question sera "Comment" et ensuite les gens qui sont vraiment bons dans ce domaine voudront en savoir plus. :-)

Si vous faites cela à grande échelle, cela peut être très compliqué, et vous ne voulez pas qu'ils se superposent, et il y a des modules en Python qui vous aident à faire tout cela. La bonne façon de le faire dépend beaucoup de l'échelle dont nous parlons, si vous voulez l'exécuter sur plusieurs processeurs, ou même sur des machines complètement séparées, et de combien de données nous parlons.

Je l'ai seulement fait une fois, et sur une échelle pas très massive, mais j'ai fini par avoir un processus qui a une longue liste d'URL à traiter, et un autre processus qui a pris cette liste et l'a envoyé à un ensemble de processus séparés simplement en mettant des fichiers avec des URL dans des répertoires distincts qui ont travaillé comme des "files d'attente". Les processus séparés qui ont récupéré les URL regarderaient dans leur propre répertoire de file d'attente, récupèreraient l'URL et la colleraient dans un autre répertoire "outqueue", où j'aurais un autre processus qui enverrait ces fichiers dans un autre ensemble de répertoires pour les processus de traitement . Cela a bien fonctionné, pourrait être exécuté du réseau avec NFS si nécessaire (bien que nous n'ayons jamais essayé cela) et pourrait être augmenté à des tas de processus sur des tas de machines si besoin (bien que nous ne l'ayons jamais fait non plus).

Il peut y avoir des manières plus intelligentes.

1

Oui, peut être fait et n'est pas si difficile, si votre format est essentiellement fixe. Je l'ai utilisé avec httplib dans Python 2.2.3 et j'ai trouvé qu'il avait une performance abyssale dans la façon dont nous l'avons piraté ensemble (en fait, monkey corrigeant une couche de socket basée sur select() dans httplib). L'astuce consiste à obtenir le socket et à faire le tampon vous-même, donc vous ne vous battez pas avec les couches intermédiaires (faites pour des performances horribles quand nous avions le tampon httplib pour le décodage http, le buffer de couche socket pour lire)).

Ensuite, ayez un statemachine qui récupère les nouvelles données de la socket lorsque cela est nécessaire et qui envoie les blocs complétés dans une file d'attente qui alimente vos threads de traitement.

Je l'utilise pour transférer des fichiers, somme de contrôle (zlib.ADLER32) eux dans un thread supplémentaire et les écris au système de fichiers dans un troisième thread.Permet un débit soutenu d'environ 40 Mo/s sur ma machine locale via des sockets et avec un surcoût HTTP/en bloc.

Questions connexes