Je dois utiliser le filtre bloom dans l'algorithme de réduction side join pour filtrer une de mes entrées, mais j'ai un problème avec la fonction readFields
qui désérialise le flux d'entrée d'un cache distribué (filtre bloom) dans un filtre de floraison.Bloom Filter dans MapReduce
public class BloomJoin {
//function map : input transaction.txt
public static class TransactionJoin extends
Mapper<LongWritable, Text, Text, Text> {
private Text CID=new Text();
private Text outValue=new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String record[] = line.split(",", -1);
CID.set(record[1]);
outValue.set("A"+value);
context.write(CID, outValue);
}
}
//function map : input customer.txt
public static class CustomerJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
private Text outkey=new Text();
private Text outvalue = new Text();
private BloomFilter bfilter = new BloomFilter();
public void setup(Context context) throws IOException {
URI[] files = DistributedCache
.getCacheFiles(context.getConfiguration());
// if the files in the distributed cache are set
if (files != null) {
System.out.println("Reading Bloom filter from: "
+ files[0].getPath());
// Open local file for read.
DataInputStream strm = new DataInputStream(new FileInputStream(
files[0].toString()));
bfilter.readFields(strm);
strm.close();
// Read into our Bloom filter.
} else {
throw new IOException(
"Bloom filter file not set in the DistributedCache.");
}
};
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String record[] = line.split(",", -1);
outkey.set(record[0]);
if (bfilter.membershipTest(new Key(outkey.getBytes()))) {
outvalue.set("B"+value);
context.write(outkey, outvalue);
}
}
}
//function reducer: join customer with transaction
public static class JoinReducer extends
Reducer<Text, Text, Text, Text> {
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
listA.clear();
listB.clear();
for (Text t : values) {
if (t.charAt(0) == 'A') {
listA.add(new Text(t.toString().substring(1)));
System.out.println("liste A: "+listA);
} else /* if (t.charAt('0') == 'B') */{
listB.add(new Text(t.toString().substring(1)));
System.out.println("listeB :"+listB);
}
}
executeJoinLogic(context);
}
private void executeJoinLogic(Context context) throws IOException,
InterruptedException {
if (!listA.isEmpty() && !listB.isEmpty()) {
for (Text A : listB) {
for (Text B : listA) {
context.write(A, B);
System.out.println("A="+A+",B="+B);
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000");
DistributedCache.addCacheFile(bloompath.toUri(),conf);
Job job = new Job(conf, "Bloom Join");
job.setJarByClass(BloomJoin.class);
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err
.println("ReduceSideJoin <Transaction data> <Customer data> <out> ");
System.exit(1);
}
MultipleInputs.addInputPath(job, new Path(otherArgs[0]),
TextInputFormat.class,TransactionJoin.class);
MultipleInputs.addInputPath(job, new Path(otherArgs[1]),
TextInputFormat.class, CustomerJoinMapper.class);
job.setReducerClass(JoinReducer.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 3);
}
}
Comment puis-je résoudre ce problème?
Veuillez ajouter le code et l'exception d'erreur à votre question ... –
J'ai ajouté le code de ma classe et l'image d'erreur si elle n'apparaît pas c'est: java.io.EOFException: at java.io. DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso