2009-05-08 15 views
4

Un réducteur de wordcount simple Ruby ressemble à ceci:Paralléliser les réducteurs Ruby dans Hadoop?

#!/usr/bin/env ruby 
wordcount = Hash.new 
STDIN.each_line do |line| 
keyval = line.split("|") 
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i 
end 

wordcount.each_pair do |word,count| 
puts "#{word}|#{count}" 
end 

il est dans la STDIN tous les cartographes valeurs intermédiaires. Pas d'une clé spécifique. Donc, en réalité, il n'y a qu'un seul réducteur pour tous (et non pas réducteur par mot ou par ensemble de mots).

Cependant, sur les exemples Java, j'ai vu cette interface qui obtient une clé et une liste de valeurs comme inout. Ce qui signifie que les valeurs de carte intermidiate sont regroupées par clé avant réduits et réducteurs peuvent fonctionner en parallèle:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
       int sum = 0; 
       while (values.hasNext()) { 
       sum += values.next().get(); 
       } 
       output.collect(key, new IntWritable(sum)); 
      } 
      } 

Est-ce une Java seule caractéristique? Ou puis-je le faire avec Hadoop Streaming en utilisant Ruby?

Répondre

5

Les réducteurs s'exécutent toujours en parallèle, que vous utilisiez le streaming ou non (si vous ne le voyez pas, vérifiez que la configuration du travail est définie pour autoriser plusieurs tâches de réduction - voir mapred.reduce.tasks dans votre cluster ou configuration de travail). La différence réside dans le fait que le framework intègre les choses un peu mieux lorsque vous utilisez Java par rapport au streaming.

Pour Java, la tâche de réduction obtient un itérateur sur toutes les valeurs d'une clé particulière. Cela facilite la lecture des valeurs si vous faites, par exemple, la somme des résultats de la carte dans votre tâche de réduction. En streaming, vous obtenez littéralement un flux de paires clé-valeur. Vous garanti que les valeurs seront triées par clé, et que pour une clé donnée ne sera pas répartie entre les tâches de réduction, mais tout suivi d'état dont vous avez besoin est à vous. Par exemple, en Java sortie de votre carte est à votre réducteur symboliquement sous la forme

key1, {val1, val2, val3} key2, {val7, Val8}

Avec le streaming, votre résultat ressemble plutôt comme

écrire un réducteur qui calcule la somme des valeurs pour chaque touche, vous aurez besoin d'une variable

key1, val1 key1, val2 key1, val3 key2, val7 key2, Val8

par exemple, pour stocker le dernier clé que vous avez vu et une variable pour stocker la somme. Chaque fois que vous lisez une nouvelle paire clé-valeur, vous procédez comme suit:

  1. Vérifiez si la clé est différente de la dernière clé.
  2. si c'est le cas, affichez votre clé et votre somme actuelle, et réinitialisez la somme à zéro.
  3. Ajoutez la valeur actuelle à votre somme et définissez la dernière clé de la clé actuelle.

HTH.

1

Je n'ai pas essayé Hadoop Streaming moi-même mais à la lecture des documents, je pense que vous pouvez obtenir un comportement parallèle similaire. Au lieu de transmettre une clé avec les valeurs associées à chaque réducteur, le streaming groupera la sortie du mappeur au moyen des clés. Il garantit également que les valeurs avec les mêmes clés ne seront pas réparties sur plusieurs réducteurs. Ceci est quelque peu différent de la fonctionnalité Hadoop normale, mais même ainsi, le travail de réduction sera réparti sur plusieurs réducteurs. Essayez d'utiliser l'option -verbose pour obtenir plus d'informations sur ce qui se passe réellement. Vous pouvez également essayer d'expérimenter avec l'option -D mapred.reduce.tasks=X où X est le nombre désiré de réducteurs.