2013-02-01 3 views
7

J'ai une table Hive qui contient les données des appels clients. Pour simplifier, considérons qu'il a 2 colonnes, la première colonne contient l'ID client et la deuxième colonne contient l'horodatage de l'appel (horodatage unix).Calcul des différences entre les enregistrements successifs dans Hadoop avec les requêtes Hive

Je peux interroger cette table pour trouver tous les appels pour chaque client:

SELECT * FROM mytable SORT BY customer_id, call_time; 

Le résultat est:

Customer1 timestamp11 
Customer1 timestamp12 
Customer1 timestamp13 
Customer2 timestamp21 
Customer3 timestamp31 
Customer3 timestamp32 
... 

Est-il possible de créer une requête Hive qui renvoie, pour chaque client , à partir du deuxième appel, l'intervalle de temps entre deux appels successifs? Pour l'exemple ci-dessus cette requête devrait retourner:

Customer1 timestamp12-timestamp11 
Customer1 timestamp13-timestamp12 
Customer3 timestamp32-timestamp31 
... 

J'ai essayé d'adapter les solutions du sql solution, mais je suis coincé avec les limites Hive: it accepts subqueries only in FROM et joins must contain only equalities.

Merci.

EDIT1:

J'ai essayé d'utiliser une fonction UDF Hive:

public class DeltaComputerUDF extends UDF { 
private String previousCustomerId; 
private long previousCallTime; 

public String evaluate(String customerId, LongWritable callTime) { 
    long callTimeValue = callTime.get(); 
    String timeDifference = null; 

    if (customerId.equals(previousCustomerId)) { 
     timeDifference = new Long(callTimeValue - previousCallTime).toString(); 
    } 

    previousCustomerId = customerId; 
    previousCallTime = callTimeValue; 

    return timeDifference; 
}} 

et l'utiliser avec le nom "delta".

Mais il semble (à partir des journaux et du résultat) qu'il est utilisé au moment de la MAP. 2 problèmes se posent de ceci:

Première: Les données de la table doivent être triées par ID client et l'horodatage AVANT d'utiliser cette fonction. La requête:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

ne fonctionne pas parce que la partie de tri est effectuée au temps de REDUCE, longtemps après que ma fonction soit utilisée.

Je peux trier les données de la table avant d'utiliser la fonction, mais je ne suis pas content car c'est un surcoût que j'espère éviter.

Deuxième: Dans le cas d'une configuration distribuée Hadoop, les données sont réparties entre les trackers d'emploi disponibles. Je crois donc qu'il y aura plusieurs instances de cette fonction, une pour chaque mappeur, de sorte qu'il est possible de diviser les mêmes données clients entre deux mappeurs. Dans ce cas, je vais perdre les appels des clients, ce qui n'est pas acceptable.

Je ne sais pas comment résoudre ce problème. Je sais que DISTRIBUTE BY assure que toutes les données avec une valeur spécifique sont envoyées au même réducteur (assurant ainsi que SORT fonctionne comme prévu), quelqu'un sait-il s'il y a quelque chose de similaire pour le mapper?

Ensuite, je prévois de suivre la suggestion de libjack d'utiliser un script de réduction. Ce "calcul" est nécessaire entre d'autres requêtes de la ruche, donc je veux essayer tout ce que Hive propose, avant de passer à un autre outil, comme suggéré par Balaswamy vaddeman.

EDIT2:

j'ai commencé à étudier la solution de scripts personnalisés. Mais, dans la première page du chapitre 14 dans la programmation livre Hive (ce chapitre présente les scripts personnalisés), je trouve le paragraphe suivant:

Le streaming est généralement moins efficace que le codage des UDFs comparables ou objets inputFormat. Sérialisation et désérialisation des données pour le passer et hors de la conduite est relativement inefficace. Il est également plus difficile de déboguer le programme complet d'une manière unifiée. Cependant, il est utile pour le prototypage rapide et pour tirer parti du code existant qui n'est pas écrit en Java. Pour Hive les utilisateurs qui ne veulent pas écrire du code Java, il peut être une approche très efficace .

Il était donc clair que les scripts personnalisés ne sont pas la meilleure solution en termes d'efficacité.

Mais comment conserver ma fonction UDF, mais m'assurer qu'elle fonctionne comme prévu dans une configuration Hadoop distribuée? J'ai trouvé la réponse à cette question dans la section Internes UDF de la page wiki UDF du manuel de langue. Si j'écris ma requête:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

il est exécuté à temps et REDUIRE BY et DISTRIBUER TRIER PAR constructions garantissent que tous les enregistrements du même client sont en cours de traitement par le même réducteur, pour des appels. Par conséquent, l'UDF ci-dessus et cette construction de requête résolvent mon problème.

(Désolé pour ne pas ajouter les liens, mais je ne suis pas autorisé à le faire parce que je n'ai pas assez de points de réputation)

+0

Je pense que c'est très similaire à [cette question] (http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows) J'ai répondu en utilisant une carte personnalisée/réduire dans la ruche. Vous devrez juste fournir le script de réduction approprié. – libjack

+0

Je ne sais pas comment faire cela dans la ruche, mais il y a une API en cascade pour le faire. Il y a quelque chose qui s'appelle buffer in cascading.http: //docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –

Répondre

11

Il est une vieille question, mais pour les futures références, je vous écris ici une autre proposition:

Ruche Windowing functions permet d'utiliser les valeurs précédentes/suivantes dans votre requête.

Une requête de code de simili peut être:

SELECT customer_id, LAG (call_time, 1, 0) sur (PARTITION PAR COMMANDE PAR customer_id call_time rangées 1 précédent) - call_time à partir de matable;

0

Peut-être que quelqu'un rencontre une exigence similaire, la solution que je trouve est la suivante:

1) Créer une fonction personnalisée:

package com.example; 
// imports (they depend on the hive version) 
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) " 
    + "- computes the time passed between two succesive records from the same customer. " 
    + "It generates 3 columns: first contains the customer id, second contains call time " 
    + "and third contains the time passed from the previous call. This function returns only " 
    + "the records that have a previous call from the same customer (requirements are not applicable " 
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" 
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " 
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") 
public class DeltaComputerUDTF extends GenericUDTF { 
private static final int NUM_COLS = 3; 

private Text[] retCols; // array of returned column values 
private ObjectInspector[] inputOIs; // input ObjectInspectors 
private String prevCustomerId; 
private Long prevCallTime; 

@Override 
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { 
    if (ois.length != 2) { 
     throw new UDFArgumentException(
       "There must be 2 arguments: customer Id column name and call time column name"); 
    } 

    inputOIs = ois; 

    // construct the output column data holders 
    retCols = new Text[NUM_COLS]; 
    for (int i = 0; i < NUM_COLS; ++i) { 
     retCols[i] = new Text(); 
    } 

    // construct output object inspector 
    List<String> fieldNames = new ArrayList<String>(NUM_COLS); 
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); 
    for (int i = 0; i < NUM_COLS; ++i) { 
     // column name can be anything since it will be named by UDTF as clause 
     fieldNames.add("c" + i); 
     // all returned type will be Text 
     fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); 
} 

@Override 
public void process(Object[] args) throws HiveException { 
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); 
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); 

    if (customerId.equals(prevCustomerId)) { 
     retCols[0].set(customerId); 
     retCols[1].set(callTime.toString()); 
     retCols[2].set(new Long(callTime - prevCallTime).toString()); 
     forward(retCols); 
    } 

    // Store the current customer data, for the next line 
    prevCustomerId = customerId; 
    prevCallTime = callTime; 
} 

@Override 
public void close() throws HiveException { 
    // TODO Auto-generated method stub 

} 

} 

2) Créer un bocal contenant cette fonction. Supposons que le nom de jar est myjar.jar.

3) Copiez le pot sur la machine avec Hive. Supposons qu'il est placé dans/tmp

4) définissent la fonction personnalisée à l'intérieur de la ruche:

ADD JAR /tmp/myjar.jar; 
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5) exécution de la requête:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
    (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

Remarques:

une. J'ai supposé que la colonne call_time stocke les données comme bigint. Dans le cas où il s'agit d'une chaîne, dans la fonction de processus, nous l'extrayons sous forme de chaîne (comme nous le faisons avec customerId), puis l'analysons à Long

b. J'ai décidé d'utiliser un UDTF au lieu de UDF parce que de cette façon il génère toutes les données dont il a besoin. Sinon (avec UDF), les données générées doivent être filtrées pour ignorer les valeurs NULL.Ainsi, avec la fonction UDF (DeltaComputerUDF) décrite dans la première édition du message original, la requête sera:

SELECT customer_id, call_time, time_difference 
FROM 
    (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
     (
     SELECT customer_id, call_time FROM mytable 
     DISTRIBUTE BY customer_id 
     SORT BY customer_id, call_time 
     ) t 
    ) u 
WHERE time_difference IS NOT NULL; 

c. Les deux fonctions (UDF et UDTF) fonctionnent comme vous le souhaitez, quel que soit l'ordre des lignes dans la table (il n'est donc pas nécessaire de trier les données de la table par identifiant client et temps d'appel avant d'utiliser les fonctions delta)

1

Vous pouvez utiliser explicite MAP-REDUCE avec d'autres langages de programmation comme Java ou Python. Où émettre de la carte {cutomer_id,call_time} et dans le réducteur, vous obtiendrez {customer_id,list{time_stamp}} et dans le réducteur, vous pouvez trier ces horodatages et peut traiter les données.

Questions connexes