2012-11-21 6 views
0

j'ai des données de transaction commeFetch tuples filtré de fonctionnement du groupe dans Pig Latin

txn_id, receiver_userid, sender_userid, montant

1,1,2,50

2,1,2,100

3,1,2,500
4,5,3,100
5,5,3,200
6,5,3,300
7,6,2,200

8,6,1,200

Maintenant, je veux trouver tous les utilisateurs de récepteur qui ont reçu plus de 2 transactions de même un autre utilisateur, j'ai commencé mon travail écrit PIG comme

txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int); 
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid); 
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid), COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum; 
txncount1 = FILTER txncount by num_txns > 2; 
dump txncount1; 

Au-dessus me donne des agrégats de groupe corrects, mais mes exigences supplémentaires sont

1) trouver record du groupe agrégé et leur ensemble associé de tuples (de txns individuels), par exemple - si agrège mon groupe dit userid 1 a reçu 3 opérations de userid 2 , J'ai besoin d'avoir les trois tuples stockés dans un autre fichier de données.

2) Groupe classe ne correspond pas à> 2 état de transaction doit être ignorée (ici deux derniers enregistrements doivent être ignorés)

3) Je veux assigner séquence à mes agrégats de groupe et même séquence doit être utilisée comme clé de liaison dans leurs tuples de transaction associés (pour identifier ces trois enregistrements de transaction sont associés à un agrégat de groupe particulier). J'expérimente en utilisant diverses fonctions mais jusqu'ici pas de chance.

Les pointeurs d'aide sont appréciés, Thx.

Répondre

0

Vous pouvez porter des sacs créés par le GROUP BY, ils contiennent toutes les colonnes d'origine, pour vérifier le faire

DESCRIBE grptxn1; 

Pour répondre à des exigences 1 et 2:

txnrecord = LOAD './txndata' USING PigStorage(',') AS (txn_id:int, receiver_userid:int, sender_userid:int, amount:int); 
grptxn1 = GROUP txnrecord BY (receiver_userid, sender_userid); 
txncount = FOREACH grptxn1 GENERATE FLATTEN(group) as (receiver_userid, sender_userid), 
txnrecord, -- carry bags through the filter 
COUNT(txnrecord) as num_txns, SUM(txnrecord.amount) as total_sum ; 
txncount1 = FILTER txncount by num_txns > 2; 
tran_dump = FOREACH txncount1 GENERATE FLATTEN(txnrecord); 
STORE tran_dump INTO 'another data file'; 

txncount2 = FOREACH txncount1 GENERATE (receiver_userid, sender_userid, num_txns, total_sum); 
dump txncount2; 

Exigence 3 est pas facile à faire dans MapReduce sans le rendre vraiment lent ou en utilisant un courtier en réseau ID. Peut-être que vous n'en auriez pas besoin puisque le FLATTEN (txnrecord) videra toutes les colonnes présentes dans le fichier d'entrée.

+0

Merci, 1 et 2 sont faits ...... laissez-moi essayer quelque chose pour 3. – Rushik

Questions connexes