2017-09-22 19 views
0

j'ai un ensemble de fichiers très ennuyeux structuré comme si « pivot table »:Spark/Hive - données du groupe dans un format

userId string, 
eventType string, 
source string, 
errorCode string, 
startDate timestamp, 
endDate timestamp 

Chaque fichier peut contenir un nombre arbitraire d'enregistrements par eventId, avec plus ou moins eventTypes et sources, et code différent et date de début/fin pour chacun.

Existe-t-il un moyen dans Hive ou Spark de regrouper tous ces éléments ensemble sur userId, un peu comme une valeur-clé, où la valeur est la liste de tous les champs associés à l'ID utilisateur? Plus précisément, j'aimerais qu'il soit saisi par eventType et source. Fondamentalement, je veux échanger la longueur de la table pour la largeur, un peu comme un tableau croisé dynamique. Mon but pour cela est d'être éventuellement stocké sous forme de fichier Apache Parquet ou Avro pour une analyse plus rapide dans le futur.

Voici un exemple:

données Source:

userId, eventType, source, errorCode, startDate, endDate 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452' 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775' 
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229' 
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976' 
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623' 
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777' 

Objectif:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 

Les noms de champs ou de l'ordre n'a pas d'importance, tant que je peux les distinguer.

J'ai essayé deux méthodes déjà obtenir ce travail:

  1. sélectionner manuellement chaque combinaison de la table et rejoindre à un ensemble de données maître. Cela fonctionne très bien, et se parallélise bien, mais ne permet pas un nombre arbitraire de valeurs pour les champs clés, et nécessite que le schéma soit prédéfini.
  2. Utilisez Spark pour créer un dictionnaire d'enregistrements key: value où chaque valeur est un dictionnaire. Fondamentalement, bouclez l'ensemble de données, ajoutez une nouvelle clé au dictionnaire si elle n'existe pas, et pour cette entrée, ajoutez un nouveau champ au dictionnaire de valeur s'il n'existe pas. Cela fonctionne magnifiquement, mais est extrêmement lent et ne se parallélise pas bien, si c'est le cas. Aussi, je ne suis pas sûr que ce soit un format compatible Avro/Parquet.

Existe-t-il des alternatives à ces deux méthodes? Ou même une meilleure structure que mon objectif?

Répondre

1

Voulez-vous avoir quelque chose comme ça?

from pyspark.sql.functions import struct, col, create_map, collect_list 

df = sc.parallelize([ 
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'], 
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'], 
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'], 
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'], 
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'], 
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'], 
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'] 
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate')) 
df.show() 

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\ 
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')])) 

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail')) 
new_df.show() 
+0

Merci! Cela semble que ça pourrait marcher! Je l'ai essayé sur mon jeu de données en direct et il revient à peu près ce que je voulais, en termes de regroupement. Cependant, je ne suis pas familier avec la structure de données "liste de cartes", et je ne trouve rien de documenté nulle part sur ses opérations. Je suppose qu'une question de suivi serait, Comment puis-je interagir avec cette structure de données? A titre d'exemple, comment puis-je obtenir les attributs CHARGE/MERCH pour un utilisateur spécifique? –

+1

Heureux que cela a aidé! Je pense que cela peut aider à démarrer avec la question de suivi: 'de la chaîne d'importation itertools; new_df.printSchema(); rdd1 = new_df.where (col ('userId') == '552113'). Select ('événement_détail'). Rdd.flatMap (lambda x: chaîne (* (x))); keys = rdd1.map (lambda x: x.keys()). Collect(); values ​​= rdd1.map (lambda x: x.values ​​()). Collect(); '' keys' et 'values' sont ceux qui doivent être recherchés. – Prem

0

Pouvez-vous essayer cela et donner vos commentaires,

>>> from pyspark.sql import SparkSession 
>>> from pyspark.sql import functions as F 
>>> from pyspark.sql.types import * 

>>> spark = SparkSession.builder.getOrCreate() 

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')] 

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate']) 
>>> df.show(10,False) 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|userId|eventType|source |errorCode|startDate    |endDate    | 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|552113|ACK  |PROVIDER|0  |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452| 
|284723|ACK  |PROVIDER|0  |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775| 
|552113|TRADE |MERCH |0  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229| 
|552113|CHARGE |MERCH |0  |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976| 
|284723|REFUND |MERCH |1  |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|CLOSE |PROVIDER|0  |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623| 
|284723|CLOSE |PROVIDER|0  |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777| 
+------+---------+--------+---------+-----------------------+-----------------------+ 

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list 
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list')) 

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId 

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output 

>>> def f(Vals): 
     aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above 
     if len(aggVals) == 5: 
      return aggVals 
     else: 
      missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null 
      for idx,val in missngval: 
       aggVals.insert(idx,[None]*5) 
      return aggVals 

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType()))) 
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values')) 

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array] 

>>> oldnames = df4.columns 
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch'] 

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns 
>>> finalDF.show()  
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider  |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch  |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch  |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch  | 
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+ 
|284723|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null    |null   |null    |null     |null     |null    |null    |null    |null     |null     |CLOSE     |PROVIDER   |0      |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND    |MERCH   |1     |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE    |MERCH   |0     |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE    |MERCH   |0     |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE     |PROVIDER   |0      |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null    |null    |null    |null     |null     | 
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+