Quelqu'un peut-il me dire comment Pig instancie des objets UDF? J'ai utilisé Pig pour construire un pipeline pour traiter certaines données. J'ai déployé le pipeline dans le cluster multi-noeud Hadoop
Et je veux enregistrer tous les résultats intermédiaires qui sont produits après chaque étape dans le pipeline. J'ai donc écrit un fichier UDF en Java qui va ouvrir une connexion HTTP à l'initialisation et transmettre les données au exec
. En outre, je vais fermer la connexion dans finalize
de l'objet.Comment Pig instancie-t-il des objets UDF
Mon script peut être simplifiée comme suit:
REGISTER MyPackage.jar;
DEFINE InterStore test.InterStore('localhost', '58888');
DEFINE Clean test.Clean();
raw = LOAD 'mydata';
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
grp = GROUP named BY LocationID;
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)});
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
ordered = ORDER sum BY TotalAccesses DESC;
STORE ordered INTO 'result';
Et le code pour Interstore peut être simplifié comme ci-dessous:
class InterStore extends EvalFunc<Tuple>{
HttpURLConnection con; //Avoid redundant connection establishment in exec
public InterStore(String ip, String port) throws IOException
{
URL url = new URL("http://" + ip + ':' + port);
con = (HttpURLConnection)url.openConnection();
con.setRequestMethod("PUT");
con.setDoOutput(true);
con.setDoInput(true);
}
public Tuple exec(Tuple input) throws IOException
{
con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
return input;
}
@Override
protected void finalize() throws Throwable
{
con.getOutputStream().close();
int respcode = con.getResponseCode();
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
in.close();
}
}
Cependant, je trouve que la connexion HTTP ne peut pas transmettre des données avec succès comme il le fait en mode local. Comment faire face à cela?
Salut, si vous pouvez partager le script où vous avez essayé utiliser votre UDF qui pourrait aider. – kecso
J'ai ajouté un échantillon de code. Thx ~ – Trams