Je suis en train de mettre en œuvre le modèle distinct:Hadoop MapReduce modèle distinct avec inscriptibles sur commande produit clés en double
map(key, record):
emit record,null
reduce(key, records):
emit key
Ma clé est un complexe, sur mesure Writable
. Si j'émets en réduire la clé et son hashcode:
context.write(key, new IntWtitable(key.hashCode());
Je reçois la sortie suivante:
key1 -1808937256
key2 -768063202
key3 906064410
key2 -768063202
key3 906064410
En théorie, la sortie ne doit contenir que key1
, key2
et key3
depuis que je suis en utilisant le HashPartitioner
: les clés avec un code de hachage égal sont combinées dans la même partition. Ce n'est clairement pas le cas ici.
Si je convertir mon Writable
complexe en un objet Text
(et d'adapter les classes Mapper/Réducteur en conséquence), et à émettre dans le Mapper
:
context.write(new Text(key.toString()), NullWritable.get());
... la sortie est comme prévu:
key1 1013632023
key2 762485389
key3 -1193948769
Ok, et voici un exemple de travail minimal qui illustre le comportement.
entrée:
A A A A A
B B B B B
C C C C C
A A A A A
B B B B B
La tâche MapReduce:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DistinctPattern extends Configured implements Tool {
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
ComplexObject o = new ComplexObject(value.toString());
context.write(o, NullWritable.get());
}
}
public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> {
public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, new IntWritable(key.hashCode()));
}
}
public static class MyArrayWritable extends ArrayWritable {
public MyArrayWritable(Writable[] values) {
super(DatumObject.class, values);
}
public MyArrayWritable() {
super(DatumObject.class);
}
@Override
public String toString() {
return Arrays.toString(get());
}
}
public static class DatumObject implements Writable {
private String datum;
public DatumObject() {}
public DatumObject(String d) {
datum = d;
}
@Override
public void readFields(DataInput in) throws IOException {
datum = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(datum);
}
@Override
public String toString() {
return datum;
}
@Override
public int hashCode() {
return 31 * datum.hashCode();
}
}
public static class ComplexObject implements WritableComparable<ComplexObject> {
private List<DatumObject> data = new ArrayList<>();
public ComplexObject() {}
public ComplexObject(String d) {
String[] elements = d.split(" ");
for(int i = 0; i < elements.length; i++)
data.add(new DatumObject(elements[i]));
}
public int size() {
return data.size();
}
@Override
public void readFields(DataInput in) throws IOException {
data.clear();
MyArrayWritable m = new MyArrayWritable();
m.readFields(in);
Writable[] w = m.get();
for(int i = 0; i < w.length; i++)
data.add((DatumObject) w[i]);
}
@Override
public void write(DataOutput out) throws IOException {
MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()]));
m.write(out);
}
@Override
public int compareTo(ComplexObject o) {
if(this.equals(o))
return 0;
if(o.size() < this.size())
return -1;
return 1;
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof ComplexObject))
return false;
ComplexObject other = (ComplexObject) obj;
return other.data.equals(data);
}
@Override
public int hashCode() {
return 31 * data.hashCode();
}
@Override
public String toString() {
StringBuilder s= new StringBuilder();
data.forEach(entry -> {
s.append(entry);
s.append(" ");
});
return s.toString();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJar("distinct.jar");
job.setJarByClass(DistinctPattern.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
job.setMapOutputKeyClass(ComplexObject.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ComplexObject.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistinctPattern(), args);
System.exit(exitCode);
}
}
Résultats attendus:
A A A A A 368623362
B B B B B 1285710467
C C C C C -2092169724
sortie réelle:
A A A A A 368623362
B B B B B 1285710467
C C C C C -2092169724
A A A A A 368623362
B B B B B 1285710467
Qu'est-ce qui me manque?
PS: Hadoop 2.7.3