IntroUtiliser le céleri pour distribuer le traitement des mathématiques
Hé les gars, je suis très nouveau pour le céleri et les files d'attente tâche en général, donc j'ai une question qui est probablement assez naïve. Je veux prendre un fichier .csv plutôt grand (qui est converti en un DataFrame pandas) et y exécuter un test pearson (une fonction mathématique de statistiques) sur toutes les paires de colonnes. Cela prend environ 9 minutes à cause d'un seul noyau et nous avons des centaines de ces fichiers .csv! Je souhaite donc diviser ce traitement entre tous les cœurs de notre cluster à 3 serveurs. Voici un prototype de mon code jusqu'ici ....
from celery import Celery
import numpy as np
import pandas as pd
import scipy.stats as stats
import itertools
app = Celery()
minute_CSV = pd.read_csv('./test_dataframe.csv')
cycle_length = 300
row_max = minute_CSV.shape[0]
r_vector_data = pd.DataFrame()
column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2)
xy_cols = list(column_combinations)
@app.task
def data_processing(minute_CSV, cycle_length, row_max, x, y):
return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c],
minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)])
for i in range(0, len(xy_cols)):
x = xy_cols[i][0]
y = xy_cols[i][1]
r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y)
pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv)
Quand je lance ce que je reçois ce message:
"[1200 lignes x 870 colonnes] ne JSON sérialisable"
le Math
la façon dont fonctionne la corrélation de Pearson est le suivant: prendre 300 (dans mon cas) des rangées successives de deux c olumns, exécutez la corrélation et stockez le résultat dans un nouveau DataFrame (r_vector_data). Ceci est fait pour les lignes: (0..299), (1..300), (2..301), et ainsi de suite.
En outre, ce script considère seulement un fichier .csv, mais sera modifié plus tard :).
Réflexions sur où aller à partir d'ici? Comment pourrais-je utiliser céleri pour accomplir cela, parce que je suis un peu perdu dans la documentation.
Merci!