2017-02-04 1 views
1

Ce programme python3 tente de produire une liste de fréquence de mots à partir d'un fichier texte en utilisant map/reduce. Je voudrais savoir comment commander le nombre de mots, représenté comme «compte» dans la déclaration de rendement du deuxième réducteur de sorte que les plus grandes valeurs de compte apparaissent en dernier. À l'heure actuelle, la queue des résultats ressembler à ceci:Mapper/réduire l'ordre de comptage en deux étapes

"0002" "wouldn" 
"0002" "wrap" 
"0002" "x" 
"0002" "xxx" 
"0002" "young" 
"0002" "zone" 

Pour le contexte, je passe un fichier texte de mot dans le programme python3 comme ceci:

python MapReduceWordFreqCounter.py book.txt 

Voici le code pour MapReduceWordFreqCounter.py:

from mrjob.job import MRJob 
from mrjob.step import MRStep 
import re 

# ignore whitespace characters 
WORD_REGEXP = re.compile(r"[\w']+") 

class MapReduceWordFreqCounter(MRJob): 

    def steps(self): 
     return [ 
      MRStep(mapper=self.mapper_get_words, 
        reducer=self.reducer_count_words), 
      MRStep(mapper=self.mapper_make_counts_key, 
        reducer = self.reducer_output_words) 
     ] 

    def mapper_get_words(self, _, line): 
     words = WORD_REGEXP.findall(line) 
     for word in words: 
      yield word.lower(), 1 

    def reducer_count_words(self, word, values): 
     yield word, sum(values) 

    def mapper_make_counts_key(self, word, count): 
     yield str(count).rjust(4,'0'), word 

    def reducer_output_words(self, count, words): 
     for word in words: 
      yield count, word 

if __name__ == '__main__': 
    MapReduceWordFreqCounter.run()   

Répondre

1

Vous devez définir un comparateur de tri personnalisé pour votre travail.

Si vous l'avez écrit en Java, il ressemblerait

job.setSortComparatorClass(SortKeyComparator.class); 

et vous devrez fournir une classe qui donne l'ordre inverse

public class SortKeyComparator extends Text.Comparator { 

    @Override 
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
     return (-1) * super.compare(b1, s1, l1, b2, s2, l2); 
    } 
} 

Je suppose que python Hadoop api a quelques Simular méthodes pour faire ce tour.

0

Pour l'étape MRJob Reduce, il n'est pas attendu que les résultats soient triés par la touche 'count'.

Ici, l'importation MRJob vous permet d'exécuter le code localement et sur un cluster AWS Elastic MapReduce. MRJob fait le gros du travail pour l'exécution car il utilise l'API Yarn et le streaming Hadoop pour le transfert de données distribuées entre la carte et la réduction des tâches.

Par exemple, pour exécuter localement, vous pouvez exécuter cette MRJob comme: python MapReduceWordFreqCounter.py books.txt> counts.txt

Pour exécuter sur un seul nœud DME: python MapReduceWordFreqCounter.py -r emr books.txt> counts.txt

Pour exécuter sur 25 nœuds de DME: python MapReduceWordFreqCounter.py -r emr --num-EC2-instances = 25 books.txt> counts.txt

Pour résoudre le distribuer Travail EMR (remplacez votre ID de travail): python -m mrjob.tools.emr.fetch_logs --find-failure j-1NXEMBAEQFDFT

Ici, lors de l'exécution sur quatre nœuds, les résultats réduits sont classés mais se trouvent dans quatre sections différentes du fichier de sortie. Il s'avère que le fait de forcer le réducteur à produire un seul fichier ordonné ne présente pas d'avantages en termes de performances par rapport à la simple commande des résultats dans une étape de post-traitement. Ainsi, une façon de résoudre cette question spécifique est d'utiliser le type de commande Linux:

sort word_frequency_list.txt > sorted_word_frequency_list.txt 

qui produit ces 'tailed' résultats:

"0970" "de" "1191" "a" " 1292" « le » « 1420 » « votre » « 1561 » « vous » « 1828 » « à »

Plus généralement, il existe des cadres sur Hadoop qui sont idéales pour ce genre de traitement.Pour ce problème, Pig peut être utilisé en lecture dans le fichier traité et ordonner les comptes.

Le cochon peut être lancé via le shell Grunt ou via des scripts Pig (en utilisant la syntaxe Pig Latin sensible à la casse). scripts Pig suivent le modèle suivant: 1) Instruction LOAD pour lire les données 2) Une série de déclaration « de transformation » pour traiter les données 3) Une déclaration DUMP/STORE pour enregistrer les résultats

Pour commander les comptes en utilisant porc:

reducer_count_output = LOAD 'word_frequency_list.txt' using PigStorage(' ') AS (word_count:chararray, word_name:chararray); 
counts_words_ordered = ORDER reducer_count_output BY word_count ASC; 
STORE counts_words_ordered INTO 'counts_words_ordered' USING PigStorage(':', '-schema');