2015-08-01 3 views
2

Existe-t-il un moyen direct d'adresser l'erreur suivante ou globalement une meilleure façon d'utiliser Hive pour obtenir la jointure dont j'ai besoin? La sortie vers une table stockée n'est pas une exigence car je peux me contenter d'un INSERT OVERWRITE LOCAL DIRECTORY à un csv. J'essaie d'effectuer la jointure croisée suivante. ipint est une table de 9 Go et la géolite est de 270 Mo.La jointure croisée de la ruche échoue sur la carte locale join

CREATE TABLE iplatlong_sample AS 
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude 
FROM geoiplite 
CROSS JOIN ipintegers 
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer; 

J'utilise CROSS JOIN sur ipintegers au lieu de geoiplite parce que j'ai lu que la règle est la plus petite table pour être à gauche, plus à droite.

Carte et réduire les étapes complètes à 100% selon HIVE, mais

2015-08-01 04: 45: 36947 Stage-1 carte = 100%, réduire = 100%, cumulatif CPU 8767,09 sec

MapReduce temps total de CPU cumulée: 0 jours 2 heures 26 minutes 7 secondes 90 msec

terminé Job = job_201508010407_0001

S tage-8 est sélectionné par le résolveur de condition.

journal d'exécution à: /tmp/myuser/.log

2015-08-01 04:45:38 À partir de lancer une tâche locale pour traiter carte rejoindre; mémoire maximale = 12221153280

d'exécution a échoué avec le statut de sortie: 3

Obtenir des informations d'erreur

La tâche a échoué!

ID Tâche: Stage-8

Journaux:

/tmp/myuser/hive.log

Failed: Erreur d'exécution, code de retour 3 de org.apache.hadoop.hive. ql.exec.mr.MapredLocalTask ​​

MapReduce emploi Lancé: Job 0: Carte: 38 Réduire: 1 CPU cumulatif: 8767,09 sec
HDFS Lire: 9438495086 HDFS Ecrire: 8575548486 SUCC ESS

Ma config ruche:

SET hive.mapred.local.mem=40960; 
SET hive.exec.parallel=true; 
SET hive.exec.compress.output=true; 
SET hive.exec.compress.intermediate = true; 
SET hive.optimize.skewjoin = true; 
SET mapred.compress.map.output=true; 
SET hive.stats.autogather=false; 

J'ai varié SET hive.auto.convert.join entre vrai et faux mais avec le même résultat.

Voici les erreurs dans le journal de sortie de/tmp/myuser/hive.log

$ tail -12 -f tmp/mysyer/hive.log 

2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3 
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information 
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - 
Task failed! 
Task ID: 
    Stage-8 

Logs: 

2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log 
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3 
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask 

Je courais le client ruche sur le Maître, une instance Google Cloud Platform de type de type n1-highmem-8 (8 CPU, 52FR) et les travailleurs sont n1-highmem-4 (4CPU 26FR), mais je soupçonne après MAP et REDUCE qu'une jointure locale (comme implicite) a lieu sur le Maître. Quoiqu'il en soit, dans les bdutils I configuré JAVAOPTS pour les noeuds de travail (n1-highmem-4) à: n1-highmem-4

SOLUTION EDIT: La solution consiste à organiser les données les données de distance dans un arbre de portée.

Répondre

1

Je ne pense pas qu'il soit possible d'effectuer ce type de force brute de jointure croisée - il suffit de multiplier les numéros de ligne, c'est un peu incontrôlable. Vous avez besoin d'optimisations, ce que je ne pense pas que la ruche soit encore capable.

Mais ce problème peut-il être résolu en temps O (N1 + N2) à condition que vous ayez vos données triées (quelle ruche peut faire pour vous) - vous parcourez les deux listes simultanément, à chaque étape obtenant un entier ip , voir si des intervalles commencent sur cet entier, en les ajoutant, en supprimant ceux qui ont fini, en émettant des tuples correspondants, et ainsi de suite. Pseudocode:

intervals=[] 
ipintegers = iterator(ipintegers_sorted_file) 
intervals = iterator(intervals_sorted_on_start_file) 
for x in ipintegers: 
    intervals = [i for i in intervals if i.end >= x] 

    while(intervals.current.start<=x): 
     intervals.append(intervals.current) 
     intervals.next() 
    for i in intervals: 
     output_match(i, x) 

Maintenant, si vous avez une fonction de script/UDF externe qui sait lire la plus petite table et obtient des entiers IP comme entrée et recrache tuples correspondant en sortie, vous pouvez utiliser la ruche et SELECT TRANSFORM pour diffuser le entrées à lui. Ou vous pouvez probablement simplement exécuter cet algorithme sur une machine locale avec deux fichiers d'entrée, parce que c'est juste O (N), et même 9 gigaoctets de données est très faisable.

+0

En effet, ce sera en quelque sorte ma prochaine approche. Je vais avoir des tables de tri de ruche par ipinteger et ip range. Les limites supérieure et inférieure de la plage * doivent être * uniques. Ainsi, le script lira un ipinteger, vérifiez les limites supérieure et inférieure de la plage. Si faux, intervalle suivant. Si la valeur est true, émettez et démarrez la comparaison au dernier intervalle vérifié. –

+0

OK Je pense que ma mise en œuvre est O (n * m) encore et prendra plusieurs jours. Comment le feriez-vous plus vite? https://gist.github.com/bbopen/f94e407cef881085599f –

+0

Bien sûr O (n * m) prendra une éternité. Mise à jour de ma réponse avec pseudocode. – letitbee