2017-08-27 4 views
0

IntroUtiliser le céleri pour distribuer le traitement des mathématiques

Hé les gars, je suis très nouveau pour le céleri et les files d'attente tâche en général, donc j'ai une question qui est probablement assez naïve. Je veux prendre un fichier .csv plutôt grand (qui est converti en un DataFrame pandas) et y exécuter un test pearson (une fonction mathématique de statistiques) sur toutes les paires de colonnes. Cela prend environ 9 minutes à cause d'un seul noyau et nous avons des centaines de ces fichiers .csv! Je souhaite donc diviser ce traitement entre tous les cœurs de notre cluster à 3 serveurs. Voici un prototype de mon code jusqu'ici ....

from celery import Celery 
import numpy as np 
import pandas as pd 
import scipy.stats as stats 
import itertools 

app = Celery() 

minute_CSV = pd.read_csv('./test_dataframe.csv') 

cycle_length = 300 
row_max = minute_CSV.shape[0] 
r_vector_data = pd.DataFrame() 

column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2) 
xy_cols = list(column_combinations) 

@app.task 
def data_processing(minute_CSV, cycle_length, row_max, x, y): 
    return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c], 
    minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)]) 

for i in range(0, len(xy_cols)): 
    x = xy_cols[i][0] 
    y = xy_cols[i][1] 
    r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y) 

pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv) 

Quand je lance ce que je reçois ce message:

"[1200 lignes x 870 colonnes] ne JSON sérialisable"

le Math

la façon dont fonctionne la corrélation de Pearson est le suivant: prendre 300 (dans mon cas) des rangées successives de deux c olumns, exécutez la corrélation et stockez le résultat dans un nouveau DataFrame (r_vector_data). Ceci est fait pour les lignes: (0..299), (1..300), (2..301), et ainsi de suite.

En outre, ce script considère seulement un fichier .csv, mais sera modifié plus tard :).

Réflexions sur où aller à partir d'ici? Comment pourrais-je utiliser céleri pour accomplir cela, parce que je suis un peu perdu dans la documentation.

Merci!

Répondre

0

Vous voyez l'erreur car Celery essaie de sérialiser JSON minute_CSV. Par défaut, tous les messages de Celery sont codés à l'aide de JSON. Voir http://docs.celeryproject.org/projects/kombu/en/latest/userguide/serialization.html pour plus d'informations à ce sujet.

Pour limiter le transfert de données, vous souhaitez probablement uniquement envoyer les lignes pertinentes pour chaque appel à votre tâche data_processing.