2017-04-25 5 views
0

Je suis en train de mettre en œuvre le modèle distinct:Hadoop MapReduce modèle distinct avec inscriptibles sur commande produit clés en double

map(key, record): 
    emit record,null 
reduce(key, records): 
    emit key 

Ma clé est un complexe, sur mesure Writable. Si j'émets en réduire la clé et son hashcode:

context.write(key, new IntWtitable(key.hashCode()); 

Je reçois la sortie suivante:

key1 -1808937256 
key2 -768063202 
key3 906064410 
key2 -768063202 
key3 906064410 

En théorie, la sortie ne doit contenir que key1, key2 et key3 depuis que je suis en utilisant le HashPartitioner: les clés avec un code de hachage égal sont combinées dans la même partition. Ce n'est clairement pas le cas ici.

Si je convertir mon Writable complexe en un objet Text (et d'adapter les classes Mapper/Réducteur en conséquence), et à émettre dans le Mapper:

context.write(new Text(key.toString()), NullWritable.get()); 

... la sortie est comme prévu:

key1 1013632023 
key2 762485389 
key3 -1193948769 

Ok, et voici un exemple de travail minimal qui illustre le comportement.

entrée:

A A A A A 
B B B B B 
C C C C C 
A A A A A 
B B B B B 

La tâche MapReduce:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.ArrayWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class DistinctPattern extends Configured implements Tool { 
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> { 


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
     ComplexObject o = new ComplexObject(value.toString()); 
     context.write(o, NullWritable.get()); 
    } 
} 

public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> { 


    public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context) 
      throws IOException, InterruptedException { 

     context.write(key, new IntWritable(key.hashCode())); 
    } 
} 

public static class MyArrayWritable extends ArrayWritable { 

    public MyArrayWritable(Writable[] values) { 
     super(DatumObject.class, values); 
    } 

    public MyArrayWritable() { 
     super(DatumObject.class); 
    } 

    @Override 
    public String toString() { 
     return Arrays.toString(get()); 
    } 

} 

public static class DatumObject implements Writable { 
    private String datum; 

    public DatumObject() {} 

    public DatumObject(String d) { 
     datum = d; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     datum = in.readUTF(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeUTF(datum);  
    } 

    @Override 
    public String toString() { 
     return datum; 
    } 

    @Override 
    public int hashCode() { 
     return 31 * datum.hashCode(); 
    } 

} 

public static class ComplexObject implements WritableComparable<ComplexObject> { 
    private List<DatumObject> data = new ArrayList<>(); 

    public ComplexObject() {} 

    public ComplexObject(String d) { 
     String[] elements = d.split(" "); 
     for(int i = 0; i < elements.length; i++) 
      data.add(new DatumObject(elements[i])); 
    } 

    public int size() { 
     return data.size(); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     data.clear(); 
     MyArrayWritable m = new MyArrayWritable(); 
     m.readFields(in); 
     Writable[] w = m.get(); 
     for(int i = 0; i < w.length; i++) 
      data.add((DatumObject) w[i]); 

    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()])); 
     m.write(out); 
    } 

    @Override 
    public int compareTo(ComplexObject o) { 
     if(this.equals(o)) 
      return 0; 

     if(o.size() < this.size()) 
      return -1; 

     return 1; 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if(!(obj instanceof ComplexObject)) 
      return false; 

     ComplexObject other = (ComplexObject) obj; 
     return other.data.equals(data); 
    } 

    @Override 
    public int hashCode() { 
     return 31 * data.hashCode(); 
    } 

    @Override 
    public String toString() { 
     StringBuilder s= new StringBuilder(); 
     data.forEach(entry -> { 
      s.append(entry); 
      s.append(" "); 
     }); 

     return s.toString(); 
    } 

} 

@Override 
public int run(String[] args) throws Exception { 
    Job job = Job.getInstance(); 
    job.setJar("distinct.jar"); 
    job.setJarByClass(DistinctPattern.class); 
    job.setMapperClass(DistinctMapper.class); 
    job.setReducerClass(DistinctReducer.class); 
    job.setMapOutputKeyClass(ComplexObject.class); 
    job.setMapOutputValueClass(NullWritable.class); 
    job.setOutputKeyClass(ComplexObject.class); 
    job.setOutputValueClass(IntWritable.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception {  
    int exitCode = ToolRunner.run(new DistinctPattern(), args); 
    System.exit(exitCode); 
} 
} 

Résultats attendus:

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 

sortie réelle:

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 
A A A A A  368623362 
B B B B B  1285710467 

Qu'est-ce qui me manque?

PS: Hadoop 2.7.3

Répondre

0

Ok, a trouvé l'erreur (s) dans mon code. Tout d'abord, l'exemple de travail minimal ne dispose pas d'une mise en œuvre de la méthode equals en classe DatumObject:

@Override 
public boolean equals(Object obj) { 
    if(obj == null) 
     return false; 

    if(!(obj instanceof DatumObject)) 
     return false; 

    DatumObject other = (DatumObject) obj; 
     return other.datum.equals(datum); 
} 

En second lieu, un aspect que je ne pouvais pas reproduire dans l'exemple de travail minime, mais qui apparaît dans mon code actuel, est que tous mes classes key ont implémenté l'interface WritableComparable. En conséquence, je soupçonne que la phase de mélange n'a pas trié les clés comme prévu. Une fois que les méthodes compareTo ont été correctement implémentées dans toutes les classes composant ma valeur key (see class diagram here), le modèle distinct a fonctionné comme prévu.