2016-05-04 11 views
0

Quelqu'un peut-il me dire comment Pig instancie des objets UDF? J'ai utilisé Pig pour construire un pipeline pour traiter certaines données. J'ai déployé le pipeline dans le cluster multi-noeud Hadoop Et je veux enregistrer tous les résultats intermédiaires qui sont produits après chaque étape dans le pipeline. J'ai donc écrit un fichier UDF en Java qui va ouvrir une connexion HTTP à l'initialisation et transmettre les données au exec. En outre, je vais fermer la connexion dans finalize de l'objet.Comment Pig instancie-t-il des objets UDF

Mon script peut être simplifiée comme suit:

REGISTER MyPackage.jar; 
DEFINE InterStore test.InterStore('localhost', '58888'); 
DEFINE Clean  test.Clean(); 

raw = LOAD 'mydata'; 
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES ''); 
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*)); 
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount; 
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount); 
grp = GROUP named BY LocationID; 
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)}); 
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses; 
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses); 
ordered = ORDER sum BY TotalAccesses DESC; 
STORE ordered INTO 'result'; 

Et le code pour Interstore peut être simplifié comme ci-dessous:

class InterStore extends EvalFunc<Tuple>{ 
    HttpURLConnection con; //Avoid redundant connection establishment in exec 
    public InterStore(String ip, String port) throws IOException 
    { 
    URL url = new URL("http://" + ip + ':' + port); 
    con = (HttpURLConnection)url.openConnection(); 
    con.setRequestMethod("PUT"); 
    con.setDoOutput(true); 
    con.setDoInput(true); 
    } 
    public Tuple exec(Tuple input) throws IOException 
    { 
    con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes()); 
    return input; 
    } 
    @Override 
    protected void finalize() throws Throwable 
    { 
    con.getOutputStream().close(); 
    int respcode = con.getResponseCode(); 
    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); 
    System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine()); 
    in.close(); 
    } 
} 

Cependant, je trouve que la connexion HTTP ne peut pas transmettre des données avec succès comme il le fait en mode local. Comment faire face à cela?

+0

Salut, si vous pouvez partager le script où vous avez essayé utiliser votre UDF qui pourrait aider. – kecso

+0

J'ai ajouté un échantillon de code. Thx ~ – Trams

Répondre

0

Y a-t-il un service d'écoute sur 'localhost', '58888'?

Notez que l'hôte local sont diffère par chaque nœud d'exécution, vous voudrez peut-être faire:

%default LHOST `localhost` 

et utiliser cette variable comme paramètre

DEFINE InterStore test.InterStore('$LHOST', '58888'); 

En général, je ferais quelques impressions dans l'UDF et vérifiez les paramètres qui lui sont passés, et testez la connexion (comme ping et vérifiez si le port est accessible depuis le nœud hadoop)