2011-05-19 10 views
9

Je suis quelque peu confus avec l'état actuel du support mapreduce dans GAE. Selon les docs http://code.google.com/p/appengine-mapreduce/ phase de réduction n'est pas encore pris en charge, mais dans la description de la session à partir de I/O 2011 (http://www.youtube.com/watch?v=EIxelKcyCC0) il est écrit "Il est maintenant possible d'exécuter des tâches Map Reduce complet sur App Engine". Je me demande si je peux utiliser MapReduce dans cette tâche:Exemple de compteur simple utilisant mapreduce dans Google App Engine

Ce que je veux faire:

Je modèle de voiture avec la couleur sur le terrain:

class Car(db.Model): 
    color = db.StringProperty() 

Je veux courir MapReduce (de de temps en temps, défini par cron) qui peut calculer le nombre de voitures dans chaque couleur et stocker ce résultat dans le magasin de données. On dirait un travail bien adapté pour mapreduce (mais si je me trompe), la phase "map" donnera des paires (, 1) pour chaque entité Car, et la phase "reduce" devrait fusionner ces données par color_name en me donnant les résultats attendus . Résultat final Je veux obtenir sont des entités avec des données calculées stockées dans le magasin de données, quelque chose comme ça:

class CarsByColor(db.Model): 
    color_name = db.StringProperty() 
    cars_num = db.IntegerProperty() 

Problème: Je ne sais pas comment implémenter dans AppEngine ... La vidéo montre des exemples avec des fonctions map et reduce définies, mais elles semblent être des exemples très généraux non liés au datastore. Tous les autres exemples que j'ai trouvés utilisent une fonction pour traiter les données de DatastoreInputReader, mais ils semblent être seulement la phase "map", il n'y a pas d'exemple de comment faire le "réduire" (et comment stocker réduire les résultats dans le magasin de données).

Répondre

6

Je fournis ici la solution que j'ai trouvée en utilisant éventuellement mapreduce de GAE (sans phase de réduction). Si j'avais commencé à partir de zéro, j'aurais probablement utilisé la solution fournie par Drew Sears.

Il fonctionne en python GAE 1.5.0

Dans app.YAML J'ai ajouté le gestionnaire pour MapReduce:

- url: /mapreduce(/.*)? 
    script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py 

et le gestionnaire pour mon code pour MapReduce (j'utilise url/mapred_update pour recueillir les résultats produits par MapReduce):

- url: /mapred_.* 
    script: mapred.py 

Créé mapreduce.yaml pour le traitement des entités automobiles:

mapreduce: 
- name: Color_Counter 
    params: 
    - name: done_callback 
    value: /mapred_update 
    mapper: 
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader 
    handler: mapred.process 
    params: 
    - name: entity_kind 
     default: models.Car 

Explication: done_callback est une URL appelée après que mapreduce ait terminé ses opérations. mapred.process est une fonction qui traite les entités individuelles et les compteurs de mise à jour (elle est définie dans le fichier mapred.py). Modèle Car est défini dans models.py

mapred.py:

from models import CarsByColor 
from google.appengine.ext import db 
from google.appengine.ext.mapreduce import operation as op 
from google.appengine.ext.mapreduce.model import MapreduceState 

from google.appengine.ext import webapp 
from google.appengine.ext.webapp.util import run_wsgi_app 

def process(entity): 
    """Process individual Car""" 
    color = entity.color 
    if color: 
     yield op.counters.Increment('car_color_%s' % color) 

class UpdateCounters(webapp.RequestHandler): 
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters""" 
    def post(self): 
     """Called after mapreduce operation are finished""" 
     # Finished mapreduce job id is passed in request headers 
     job_id = self.request.headers['Mapreduce-Id'] 
     state = MapreduceState.get_by_job_id(job_id) 
     to_put = [] 
     counters = state.counters_map.counters 
     # Remove counter not needed for stats 
     del counters['mapper_calls'] 
     for counter in counters.keys(): 
      stat = CarsByColor.get_by_key_name(counter) 
      if not stat: 
       stat = CarsByColor(key_name=counter, 
           name=counter) 
      stat.value = counters[counter] 
      to_put.append(stat) 
     db.put(to_put) 

     self.response.headers['Content-Type'] = 'text/plain' 
     self.response.out.write('Updated.') 


application = webapp.WSGIApplication(
            [('/mapred_update', UpdateCounters)], 
            debug=True) 
def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main()    

Il est légèrement modifiée du modèle définition CarsByColor par rapport à la question.

Vous pouvez démarrer manuellement le travail mapreduce à partir de l'URL: http://yourapp/mapreduce/ et, espérons-le, de cron (je n'ai pas encore testé le cron).

9

Vous n'avez pas vraiment besoin d'une phase de réduction. Vous pouvez accomplir cela avec une chaîne de tâches linéaire, plus ou moins comme suit:

def count_colors(limit=100, totals={}, cursor=None): 
    query = Car.all() 
    if cursor: 
    query.with_cursor(cursor) 
    cars = query.fetch(limit) 
    for car in cars: 
    try: 
     totals[car.color] += 1 
    except KeyError: 
     totals[car.color] = 1 
    if len(cars) == limit: 
    cursor = query.cursor() 
    return deferred.defer(count_colors, limit, totals, cursor) 
    entities = [] 
    for color in totals: 
    entity = CarsByColor(key_name=color) 
    entity.cars_num = totals[color] 
    entities.append(entity) 
    db.put(entities) 

deferred.defer(count_colors) 

Cela devrait itérer sur toutes vos voitures, passez un curseur de requête et un décompte en cours d'exécution à une série de tâches ad-hoc, et le magasin les totaux à la fin.

Une phase de réduction peut être utile si vous devez fusionner des données provenant de plusieurs banques de données, plusieurs modèles ou plusieurs index dans un même modèle. Comme je ne pense pas que cela vous achèterait quoi que ce soit.

Autre option: utilisez la file d'attente de tâches pour gérer les compteurs actifs pour chaque couleur. Lorsque vous créez une voiture, lancez une tâche pour incrémenter le total de cette couleur. Lorsque vous mettez à jour une voiture, lancez une tâche pour décrémenter l'ancienne couleur et une autre pour incrémenter la nouvelle couleur. Mettez à jour les compteurs de manière transactionnelle pour éviter les conditions de course.