2016-05-12 3 views
0

Je dois utiliser le filtre bloom dans l'algorithme de réduction side join pour filtrer une de mes entrées, mais j'ai un problème avec la fonction readFields qui désérialise le flux d'entrée d'un cache distribué (filtre bloom) dans un filtre de floraison.Bloom Filter dans MapReduce

public class BloomJoin { 

    //function map : input transaction.txt 
    public static class TransactionJoin extends 
      Mapper<LongWritable, Text, Text, Text> { 

     private Text CID=new Text(); 
     private Text outValue=new Text(); 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 

      String line = value.toString(); 
       String record[] = line.split(",", -1); 
       CID.set(record[1]); 

       outValue.set("A"+value); 
       context.write(CID, outValue); 
       } 
     } 
    //function map : input customer.txt 
      public static class CustomerJoinMapper extends 
        Mapper<LongWritable, Text, Text, Text> { 

       private Text outkey=new Text(); 
       private Text outvalue = new Text(); 
       private BloomFilter bfilter = new BloomFilter(); 
       public void setup(Context context) throws IOException { 

        URI[] files = DistributedCache 
          .getCacheFiles(context.getConfiguration()); 

        // if the files in the distributed cache are set 
        if (files != null) { 
        System.out.println("Reading Bloom filter from: " 
        + files[0].getPath()); 
        // Open local file for read. 

        DataInputStream strm = new DataInputStream(new FileInputStream(
        files[0].toString())); 
        bfilter.readFields(strm); 
        strm.close(); 

        // Read into our Bloom filter. 

        } else { 
        throw new IOException(
        "Bloom filter file not set in the DistributedCache."); 
        } 
       }; 

       public void map(LongWritable key, Text value, Context context) 
         throws IOException, InterruptedException { 
        String line = value.toString(); 
        String record[] = line.split(",", -1); 

         outkey.set(record[0]); 
         if (bfilter.membershipTest(new Key(outkey.getBytes()))) { 
         outvalue.set("B"+value); 
         context.write(outkey, outvalue); 
         } 
      } 
      } 

    //function reducer: join customer with transaction 
    public static class JoinReducer extends 
      Reducer<Text, Text, Text, Text> { 

     private ArrayList<Text> listA = new ArrayList<Text>(); 
     private ArrayList<Text> listB = new ArrayList<Text>(); 


     @Override 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 

      listA.clear(); 
      listB.clear(); 

        for (Text t : values) { 
       if (t.charAt(0) == 'A') { 
        listA.add(new Text(t.toString().substring(1))); 
        System.out.println("liste A: "+listA); 
       } else /* if (t.charAt('0') == 'B') */{ 
        listB.add(new Text(t.toString().substring(1))); 
        System.out.println("listeB :"+listB); 
       } 
      } 

      executeJoinLogic(context); 
     } 

     private void executeJoinLogic(Context context) throws IOException, 
       InterruptedException { 
       if (!listA.isEmpty() && !listB.isEmpty()) { 
        for (Text A : listB) { 
         for (Text B : listA) { 
          context.write(A, B); 
          System.out.println("A="+A+",B="+B); 
         } 
        } 
       } 

     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000"); 
     DistributedCache.addCacheFile(bloompath.toUri(),conf); 
     Job job = new Job(conf, "Bloom Join"); 
     job.setJarByClass(BloomJoin.class); 
     String[] otherArgs = new GenericOptionsParser(conf, args) 
     .getRemainingArgs(); 
     if (otherArgs.length != 3) { 
    System.err 
      .println("ReduceSideJoin <Transaction data> <Customer data> <out> "); 
    System.exit(1); 
            } 
     MultipleInputs.addInputPath(job, new Path(otherArgs[0]), 
       TextInputFormat.class,TransactionJoin.class); 
     MultipleInputs.addInputPath(job, new Path(otherArgs[1]), 
       TextInputFormat.class, CustomerJoinMapper.class); 

     job.setReducerClass(JoinReducer.class); 

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
     //job.setMapOutputKeyClass(Text.class); 
     //job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     System.exit(job.waitForCompletion(true) ? 0 : 3); 
    } 
} 

Comment puis-je résoudre ce problème?

+0

Veuillez ajouter le code et l'exception d'erreur à votre question ... –

+0

J'ai ajouté le code de ma classe et l'image d'erreur si elle n'apparaît pas c'est: java.io.EOFException: at java.io. DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

Répondre

1

Pouvez-vous essayer de changer

URI[] files = DistributedCache.getCacheFiles(context.getConfiguration()); 

à

Path[] cacheFilePaths = DistributedCache.getLocalCacheFiles(conf); 
for (Path cacheFilePath : cacheFilePaths) {  
    DataInputStream fileInputStream = fs.open(cacheFilePath); 
} 
bloomFilter.readFields(fileInputStream); 
fileInputStream.close(); 

Aussi, je pense que vous utilisez côté de la carte rejoindre et non moins côté, puisque vous utilisez le cache distribué dans Mapper.

+0

merci pour votre réponse mais le problème reste avec la fonction: readFields j'ai dans ecxeption: java.io.EOFException: à java.io.DataInputStream.readInt/à org.apache.hadoop.util.bloom.Filter.readFields/à org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

+0

c'est une jointure de réduction parce que la jointure est faite dans le côté de réduction et j'essaye de filtrer une de mes entrées dans le côté de carte avant d'envoyer le PKV au réducteur – Fatiso