J'ai une table Hive qui contient les données des appels clients. Pour simplifier, considérons qu'il a 2 colonnes, la première colonne contient l'ID client et la deuxième colonne contient l'horodatage de l'appel (horodatage unix).Calcul des différences entre les enregistrements successifs dans Hadoop avec les requêtes Hive
Je peux interroger cette table pour trouver tous les appels pour chaque client:
SELECT * FROM mytable SORT BY customer_id, call_time;
Le résultat est:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
Est-il possible de créer une requête Hive qui renvoie, pour chaque client , à partir du deuxième appel, l'intervalle de temps entre deux appels successifs? Pour l'exemple ci-dessus cette requête devrait retourner:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
J'ai essayé d'adapter les solutions du sql solution, mais je suis coincé avec les limites Hive: it accepts subqueries only in FROM et joins must contain only equalities.
Merci.
EDIT1:
J'ai essayé d'utiliser une fonction UDF Hive:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
et l'utiliser avec le nom "delta".
Mais il semble (à partir des journaux et du résultat) qu'il est utilisé au moment de la MAP. 2 problèmes se posent de ceci:
Première: Les données de la table doivent être triées par ID client et l'horodatage AVANT d'utiliser cette fonction. La requête:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
ne fonctionne pas parce que la partie de tri est effectuée au temps de REDUCE, longtemps après que ma fonction soit utilisée.
Je peux trier les données de la table avant d'utiliser la fonction, mais je ne suis pas content car c'est un surcoût que j'espère éviter.
Deuxième: Dans le cas d'une configuration distribuée Hadoop, les données sont réparties entre les trackers d'emploi disponibles. Je crois donc qu'il y aura plusieurs instances de cette fonction, une pour chaque mappeur, de sorte qu'il est possible de diviser les mêmes données clients entre deux mappeurs. Dans ce cas, je vais perdre les appels des clients, ce qui n'est pas acceptable.
Je ne sais pas comment résoudre ce problème. Je sais que DISTRIBUTE BY assure que toutes les données avec une valeur spécifique sont envoyées au même réducteur (assurant ainsi que SORT fonctionne comme prévu), quelqu'un sait-il s'il y a quelque chose de similaire pour le mapper?
Ensuite, je prévois de suivre la suggestion de libjack d'utiliser un script de réduction. Ce "calcul" est nécessaire entre d'autres requêtes de la ruche, donc je veux essayer tout ce que Hive propose, avant de passer à un autre outil, comme suggéré par Balaswamy vaddeman.
EDIT2:
j'ai commencé à étudier la solution de scripts personnalisés. Mais, dans la première page du chapitre 14 dans la programmation livre Hive (ce chapitre présente les scripts personnalisés), je trouve le paragraphe suivant:
Le streaming est généralement moins efficace que le codage des UDFs comparables ou objets inputFormat. Sérialisation et désérialisation des données pour le passer et hors de la conduite est relativement inefficace. Il est également plus difficile de déboguer le programme complet d'une manière unifiée. Cependant, il est utile pour le prototypage rapide et pour tirer parti du code existant qui n'est pas écrit en Java. Pour Hive les utilisateurs qui ne veulent pas écrire du code Java, il peut être une approche très efficace .
Il était donc clair que les scripts personnalisés ne sont pas la meilleure solution en termes d'efficacité.
Mais comment conserver ma fonction UDF, mais m'assurer qu'elle fonctionne comme prévu dans une configuration Hadoop distribuée? J'ai trouvé la réponse à cette question dans la section Internes UDF de la page wiki UDF du manuel de langue. Si j'écris ma requête:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
il est exécuté à temps et REDUIRE BY et DISTRIBUER TRIER PAR constructions garantissent que tous les enregistrements du même client sont en cours de traitement par le même réducteur, pour des appels. Par conséquent, l'UDF ci-dessus et cette construction de requête résolvent mon problème.
(Désolé pour ne pas ajouter les liens, mais je ne suis pas autorisé à le faire parce que je n'ai pas assez de points de réputation)
Je pense que c'est très similaire à [cette question] (http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows) J'ai répondu en utilisant une carte personnalisée/réduire dans la ruche. Vous devrez juste fournir le script de réduction approprié. – libjack
Je ne sais pas comment faire cela dans la ruche, mais il y a une API en cascade pour le faire. Il y a quelque chose qui s'appelle buffer in cascading.http: //docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –