2017-09-23 1 views
2

J'ai un dataframe Dask qui ressemble à ceci:agrègent un dataframe Dask et produire une trame de données d'agrégats

url  referrer session_id ts     customer 
url1 ref1  xxx  2017-09-15 00:00:00 a.com 
url2 ref2  yyy  2017-09-15 00:00:00 a.com 
url2 ref3  yyy  2017-09-15 00:00:00 a.com 
url1 ref1  xxx  2017-09-15 01:00:00 a.com 
url2 ref2  yyy  2017-09-15 01:00:00 a.com 

Je veux regrouper les données sur l'URL et l'horodatage, les valeurs de colonnes globales et produire une trame de données qui serait ressemble à ceci:

customer url ts     page_views visitors referrers 
a.com url1 2017-09-15 00:00:00 1   1  [ref1] 
a.com url2 2017-09-15 00:00:00 2   2  [ref2, ref3] 

Dans SQL Spark, je peux le faire comme suit:

select 
    customer, 
    url, 
    ts, 
    count(*) as page_views, 
    count(distinct(session_id)) as visitors, 
    collect_list(referrer) as referrers 
from df 
group by customer, url, ts 

Y a-t-il un moyen de le faire avec les dataframes Dask? J'ai essayé, mais je ne peux calculer les colonnes agrégées séparément, comme suit:

# group on timestamp (rounded) and url 
grouped = df.groupby(['ts', 'url']) 

# calculate page views (count rows in each group) 
page_views = grouped.size() 

# collect a list of referrer strings per group 
referrers = grouped['referrer'].apply(list, meta=('referrers', 'f8')) 

# count unique visitors (session ids) 
visitors = grouped['session_id'].count() 

Mais je ne peux pas sembler trouver une bonne façon de produire une trame de données combinée que j'ai besoin.

+0

Y at-il une bonne façon de faire cela dans Pandas? Est-ce que ça marche pour dask.dataframe? – MRocklin

Répondre

1

Ce qui suit ne fait œuvre:

gb = df.groupby(['customer', 'url', 'ts']) 
gb.apply(lambda d: pd.DataFrame({'views': len(d), 
    'visitiors': d.session_id.count(), 
    'referrers': [d.referer.tolist()]})).reset_index() 

(en présumant les visiteurs doivent être uniques selon le sql ci-dessus) Vous pouvez définir la meta de la sortie.

+0

Nice! Va-t-il forcer toutes les données dans la mémoire sur une machine, si je construis un 'pd.DataFrame' hors de mes données? En ce moment, c'est un exemple de jouet, mais le vrai travail fonctionnerait avec des gigaoctets de données distribuées. –

+0

Il semblait fonctionner avec des données exactement comme les vôtres; vous devriez essayer de fournir un méta-paramètre http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.groupby.DataFrameGroupBy.apply – mdurant

+0

Vous avez raison, cela a fonctionné avec les données exactement comme je l'ai spécifié dans cet exemple. Il n'a pas fonctionné sur un exemple un peu plus grand de données lues à partir de parquet partitionné. Je voudrais comprendre ce qui ne va pas avec celui-là - je vais déposer un problème dans dask avec mon échantillon de données. Stackoverflow ne semble pas être un bon endroit pour ça. Je vous remercie! –