2017-10-20 42 views
0

Je ne suis pas un expert Java, mais je connais les bases de Java et j'essaie toujours de comprendre le code Java en profondeur chaque fois qu'il se présente. Ce pourrait être un doute vraiment stupide, mais j'aimerais faire comprendre clairement dans mon esprit.
Je poste dans la communauté Java, parce que mon doute ne concerne que Java. Depuis quelques mois je travaille avec hadoop et j'ai découvert que hadoop utilise ses propres types, qui sont enroulés autour des types primitifs de Java afin d'augmenter l'efficacité pour envoyer des données sur le réseau sur la base de la sérialisation et de la désérialisation.Comment les types encapsulés fonctionnent-ils dans Hadoop?

Ma confusion commence ici, disons que nous avons des données HDFS à traiter en utilisant suivant le code Java en cours d'exécution dans le code Hadoop

org.apache.hadoop.io.IntWritable; 
org.apache.hadoop.io.LongWritable; 
org.apache.hadoop.io.Text; 
org.apache.hadoop.mapreduce.Mapper; 

import java.io.IOException; 
public class WordCountMapper 
{ 
extends Mapper<LongWritable,Text,Text,IntWritable> 
@Override 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 
} 
} 
String line = value.toString(); 
for (String word : line.split(" ")){ 
if(word.length()>0){ 
context.write(new Text(word),new IntWritable(1)); 
} 

Dans les types de ce code Hadoop sont comme ça LongWritable, texte, IntWritable.
Permet de ramasser Type de texte qui est enroulé autour du type de chaîne de Java (corrigez-moi si je me trompe).
Mon doute est là quand nous passons ces paramètres à notre carte de méthode dans le code ci-dessus, comment ces paramètres obtient interagir avec le code qui est en import package i.e org.apache.hadoop.io.Text;

Ci-dessous le code de classe de texte

package org.apache.hadoop.io; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.CharBuffer; 
import java.nio.charset.CharacterCodingException; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CharsetEncoder; 
import java.nio.charset.CodingErrorAction; 
import java.nio.charset.MalformedInputException; 
import java.text.CharacterIterator; 
import java.text.StringCharacterIterator; 
import java.util.Arrays; 
import org.apache.avro.reflect.Stringable; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.hadoop.classification.InterfaceAudience.Public; 
import org.apache.hadoop.classification.InterfaceStability.Stable; 



@Stringable 
@InterfaceAudience.Public 
@InterfaceStability.Stable 
public class Text 
    extends BinaryComparable 
    implements WritableComparable<BinaryComparable> 
{ 
    private static final Log LOG = LogFactory.getLog(Text.class); 

    private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal() 
    { 
    protected CharsetEncoder initialValue() { 
     return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT); 
    } 
    }; 



    private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal() 
    { 
    protected CharsetDecoder initialValue() { 
     return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT); 
    } 
    }; 



    private static final byte[] EMPTY_BYTES = new byte[0]; 
    private byte[] bytes; 
    private int length; 

    public Text() 
    { 
    bytes = EMPTY_BYTES; 
    } 


    public Text(String string) 
    { 
    set(string); 
    } 

    public Text(Text utf8) 
    { 
    set(utf8); 
    } 


    public Text(byte[] utf8) 
    { 
    set(utf8); 
    } 




    public byte[] getBytes() 
    { 
    return bytes; 
    } 

    public int getLength() 
    { 
    return length; 
    } 








    public int charAt(int position) 
    { 
    if (position > length) return -1; 
    if (position < 0) { return -1; 
    } 
    ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position); 
    return bytesToCodePoint(bb.slice()); 
    } 

    public int find(String what) { 
    return find(what, 0); 
    } 


    public int find(String what, int start) 
    { 
    try 
    { 
     ByteBuffer src = ByteBuffer.wrap(bytes, 0, length); 
     ByteBuffer tgt = encode(what); 
     byte b = tgt.get(); 
     src.position(start); 

     while (src.hasRemaining()) { 
     if (b == src.get()) { 
      src.mark(); 
      tgt.mark(); 
      boolean found = true; 
      int pos = src.position() - 1; 
      while (tgt.hasRemaining()) { 
      if (!src.hasRemaining()) { 
       tgt.reset(); 
       src.reset(); 
       found = false; 

      } 
      else if (tgt.get() != src.get()) { 
       tgt.reset(); 
       src.reset(); 
       found = false; 
      } 
      } 

      if (found) return pos; 
     } 
     } 
     return -1; 
    } 
    catch (CharacterCodingException e) { 
     e.printStackTrace(); } 
    return -1; 
    } 

    public void set(String string) 
    { 
    try 
    { 
     ByteBuffer bb = encode(string, true); 
     bytes = bb.array(); 
     length = bb.limit(); 
    } catch (CharacterCodingException e) { 
     throw new RuntimeException("Should not have happened " + e.toString()); 
    } 
    } 


    public void set(byte[] utf8) 
    { 
    set(utf8, 0, utf8.length); 
    } 

    public void set(Text other) 
    { 
    set(other.getBytes(), 0, other.getLength()); 
    } 






    public void set(byte[] utf8, int start, int len) 
    { 
    setCapacity(len, false); 
    System.arraycopy(utf8, start, bytes, 0, len); 
    length = len; 
    } 






    public void append(byte[] utf8, int start, int len) 
    { 
    setCapacity(length + len, true); 
    System.arraycopy(utf8, start, bytes, length, len); 
    length += len; 
    } 



    public void clear() 
    { 
    length = 0; 
    } 










    private void setCapacity(int len, boolean keepData) 
    { 
    if ((bytes == null) || (bytes.length < len)) { 
     if ((bytes != null) && (keepData)) { 
     bytes = Arrays.copyOf(bytes, Math.max(len, length << 1)); 
     } else { 
     bytes = new byte[len]; 
     } 
    } 
    } 



    public String toString() 
    { 
    try 
    { 
     return decode(bytes, 0, length); 
    } catch (CharacterCodingException e) { 
     throw new RuntimeException("Should not have happened " + e.toString()); 
    } 
    } 

    public void readFields(DataInput in) 
    throws IOException 
    { 
    int newLength = WritableUtils.readVInt(in); 
    setCapacity(newLength, false); 
    in.readFully(bytes, 0, newLength); 
    length = newLength; 
    } 

    public static void skip(DataInput in) throws IOException 
    { 
    int length = WritableUtils.readVInt(in); 
    WritableUtils.skipFully(in, length); 
    } 




    public void write(DataOutput out) 
    throws IOException 
    { 
    WritableUtils.writeVInt(out, length); 
    out.write(bytes, 0, length); 
    } 

    public boolean equals(Object o) 
    { 
    if ((o instanceof Text)) 
     return super.equals(o); 
    return false; 
    } 

Puis-je savoir s'il vous plaît quand nous exécutons le code hadoop ci-dessus, les données dans HDFS circule à travers les paramètres que nous avons mentionnés dans la méthode de la carte.
Une fois que le premier ensemble de données de HDFS a atteint le paramètre Texte, comment il circule dans la classe org.apache.hadoop.io.Text?
Je veux dire d'où commence-t-il (je suppose qu'il commence à partir de la méthode set en classe parce qu'il a les mêmes paramètres que la méthode map mentionnée, ai-je raison?)
Où change-t-il du type chaîne normale au texte taper le code? Mon deuxième doute est: lorsque les données sont stockées dans le type de texte, alors qui le frappe pour commencer à faire la sérilisation? Je veux dire qui appelle cela écrire (DataOutput out), et qui appelle readFields (DataInput in) une fois que les données sont atteintes à destination sur le réseau?
Comment ça marche, et où dois-je regarder?

J'espère que ce que je demande est clair.

Répondre

0

Comme toutes les opérations réseau ou disque, tout est transféré en octets. La classe Text désérialise les octets en UTF-8. Les writables déterminent comment les données sont représentées et les comparables déterminent comment les données sont classées. Le paramètre InputFormat défini dans le Job détermine les Writables à attribuer à une map ou à réduire Task.

Un InputSplit détermine comment diviser et lire un flux d'octets brut dans le Writables

Une tâche carte est lancée sur chaque InputSplit

Reportez-vous https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html