2016-09-02 2 views
3

J'ai essayé de signaler un bug que je rencontrais dans mclapply en ce qui concerne les grandes valeurs de retour n'étant pas autorisés.Réduction du temps système avec des fonctions parallèles dans R

Apparemment, le bug a été corrigé dans les versions de développement, mais je suis plus intéressé par le commentaire que le répondeur fait:

il y avait une limite de 2 Go la taille des objets sérialisés qui par exemple mclapply peut renvoyer à partir des processus fourchus et cet exemple tente 16GB. Cela a été levé (pour les versions 64 bits) dans R-devel, mais cette utilisation est très inhabituel et plutôt inefficace (l'exemple a besoin ca 150Go à cause de toutes les copies impliqués dans (un) sérialisation)

Si utiliser mclapply pour faire des calculs parallèles avec de grandes données est inefficace, alors quelle est la meilleure façon de le faire? Mon besoin de faire ce genre de chose ne fait qu'augmenter, et je suis en train de rencontrer des goulots d'étranglement partout. Les tutoriels que j'ai vus ont été des introductions assez simples sur la façon d'utiliser les fonctions, mais pas nécessairement comment utiliser efficacement les fonctions dans la gestion des compromis. La documentation a un petit texte de présentation sur ce compromis:

mc.preschedule: si réglé sur « TRUE », alors le calcul est d'abord divisé à (au plus) que de nombreux emplois sont là sont des noyaux et la des travaux sont démarrés, chaque travail couvrant éventuellement plus d'une valeur . Si la valeur est "FALSE", alors un travail est forké pour chaque valeur de 'X'. Le premier est meilleur pour les calculs courts ou grand nombre de valeurs dans 'X', ce dernier est meilleur pour les emplois qui ont une variance élevée de temps d'achèvement et pas trop de valeurs de 'X' par rapport à 'mc.cores'

et

Par défaut (« mc.preschedule = TRUE ») dont l'entrée « X » est divisé en autant de parties qu'il y a actuellement noyaux (les valeurs sont réparties pour les noyaux séquentiellement , c.-à-d. première valeur au noyau 1, deuxième au noyau 2 , ... (core + 1) -th valeur au core 1, etc.) puis un processus est fourchu à chaque cœur et les résultats sont collectés.

Sans pré-ordonnancement, un travail séparé est généré pour chaque valeur de 'X'. Pour vous assurer que pas plus les emplois »mc.cores de sont en cours d'exécution à une fois, une fois ce nombre a été fourchue le processus maître attend pour un enfant de terminer avant la prochaine fourche

Benchmarking ces choses se fiable une beaucoup de temps car certains problèmes ne se manifestent qu'à grande échelle, et il est alors difficile de comprendre ce qui se passe. Donc, avoir un meilleur aperçu du comportement des fonctions serait utile.

modifier:

Je n'ai pas un exemple précis, parce que j'utilise mclapply beaucoup et je voulais mieux savoir comment réfléchir sur les implications de performance. Et pendant que j'écris sur le disque, je contournerais l'erreur, je ne pense pas que cela aiderait en ce qui concerne la (dé) sérialisation qui doit se produire, qui devrait également passer par le disque IO.

Un flux de travail serait la suivante: Prenez une grande matrice clairsemée M, et l'écrire sur le disque en morceaux (par exemple M1-M100) parce que M lui-même ne correspond pas à la mémoire.

dire maintenant, pour chaque utilisateur i dans I il y a Ci colonnes M que je veux ajouter et globale au niveau de l'utilisateur. Avec des données plus petites, ce serait relativement trivial:

m = matrix(runif(25), ncol=5) 
df = data.frame(I=sample(1:6, 20, replace=T), C=sample(1:5, 20, replace=T)) 
somefun = function(m) rowSums(m) 
res = sapply(sort(unique(df$I)), function(i) somefun(m[,df[df$I == i,]$C])) 

Mais avec des données plus importantes, mon approche était de diviser le data.frame d'utilisateur/colonnes dans différentes data.frames sur la base duquel la matrice M1-M100 la colonne serait , faites une boucle parallèle sur ces data.frames, lisez dans la matrice associée, puis bouclez sur les utilisateurs, en extrayant les colonnes et en appliquant ma fonction, puis en prenant la liste de sortie, et en bouclant à nouveau et en réagrégeant. Ce n'est pas idéal si j'ai une fonction qui ne peut pas être réagrégée comme ça (pour l'instant, ce n'est pas un problème), mais je suis apparemment en train de mélanger trop de données avec cette approche.

+1

Sans connaître la structure de votre problème, il est difficile de dire comment vous pourriez faire mieux. J'ai regardé votre exemple dans le rapport de bug et c'est probablement juste pour démontrer que vous ne pouviez pas retourner de gros objets. Vous pourriez vouloir regarder dans les paquets de mémoire partagée.Vous devez afficher un exemple minimal reproductible qui capture la structure de votre problème, tout en étant assez simple à comprendre. Je ne pense pas que la planification des tâches (comme dans 'mc.preschedule') vous mènera partout. – cryo111

+1

J'ai posté un exemple simple de ce que je fais, mais les complications viennent de tout ce qui se passe avec les E/S du disque, la sérialisation et de tels problèmes. Est ce que j'essaye de faire clair? – James

Répondre

0

Pour limiter le surdébit pour un N modérément grand, il est presque toujours préférable d'utiliser mc.preschedule = TRUE (c'est-à-dire diviser le travail en autant de blocs qu'il y a de cœurs).

Il semble que votre principal compromis est entre l'utilisation de la mémoire et le processeur. C'est-à-dire que vous ne pouvez que paralléliser jusqu'à ce que les processus en cours atteignent le maximum de votre RAM. Une chose à considérer est que les différents travailleurs peuvent lire le même objet dans votre session R sans duplication. Ainsi, seuls les objets modifiés/créés dans l'appel de fonction parallèle ont leur empreinte mémoire additionnée pour chaque cœur. Si vous avez un maximum de mémoire, je vous suggère de diviser votre calcul en plusieurs sous-jobs et de les boucler séquentiellement (avec un lapply, par exemple), en appelant mclapply dans cette boucle pour paralléliser chaque sous-programme, et peut-être enregistrer la sortie du sous-job sur le disque pour éviter de tout garder en mémoire.

2

J'espère que ma réponse n'est pas trop tard, mais je pense que votre exemple peut être géré en utilisant la mémoire partagée/fichiers via le paquet bigmemory.

Créons les données

library(bigmemory) 
library(parallel) 

#your large file-backed matrix (all values initialized to 0) 
#it can hold more than your RAM as it is written to a file 
m=filebacked.big.matrix(nrow=5, 
         ncol=5, 
         type="double", 
         descriptorfile="file_backed_matrix.desc", 
         backingfile="file_backed_matrix", 
         backingpath="~") 

#be careful how to fill the large matrix with data 
set.seed(1234) 
m[]=c(matrix(runif(25), ncol=5)) 
#print the data to the console 
m[] 

#your user-col mapping 
#I have added a unique idx that will be used below 
df = data.frame(unique_idx=1:20, 
       I=sample(1:6, 20, replace=T), 
       C=sample(1:5, 20, replace=T)) 

#the file-backed matrix that will hold the results 
resm=filebacked.big.matrix(nrow=nrow(df), 
          ncol=2, 
          type="double",init = NA_real_, 
          descriptorfile="res_matrix.desc", 
          backingfile="res_backed_matrix", 
          backingpath="~") 

#the first column of resm will hold the unique idx of df 
resm[,1]=df$unique_idx 
resm[] 

Maintenant, nous allons passer à la fonction que vous souhaitez exécuter. Vous avez écrit rowSums mais en déduisant de votre texte, vous vouliez dire colSums. J'ai changé cela en conséquence.

somefun = function(x) { 
    #attach the file-backed big.matrix 
    #it makes the matrix "known" to the R process (no copying involved!) 
    #input 
    tmp=attach.big.matrix("~/file_backed_matrix.desc") 
    #output 
    tmp_out=attach.big.matrix("~/res_matrix.desc") 

    #store the output in the file-backed matrix resm 
    tmp_out[x$unique_idx,2]=c(colSums(tmp[,x$C,drop=FALSE])) 
    #return a little more than the colSum result 
    list(pid=Sys.getpid(), 
     user=x$I[1], 
     col_idx=x$C) 
} 

Faites le calcul parallèle sur tous les noyaux

#perform colSums using different threads 
res=mclapply(split(df,df$I),somefun,mc.cores = detectCores()) 

les résultats

#processes IDs 
unname(sapply(res,function(x) x$pid)) 
#28231 28232 28233 28234 28231 28232 

#users 
unname(sapply(res,function(x) x$user)) 
#1 2 3 4 5 6 

#column indexes 
identical(sort(unname(unlist(sapply(res,function(x) x$col_idx)))),sort(df$C)) 
#[1] TRUE 

#check result of colSums 
identical(lapply(split(df,df$I),function(x) resm[x$unique_idx,2]), 
      lapply(split(df,df$I),function(x) colSums(m[,x$C,drop=FALSE]))) 
#[1] TRUE 

Edit: Je prends la parole devant votre commentaire dans mon édition. Le stockage des résultats dans la matrice de sortie sauvegardée par fichier resm fonctionne comme prévu.

+0

J'ai essayé d'utiliser les paquets Rdsm et bigmemory pour faire exactement cela, en écrivant le résultat de chaque processus dans une colonne de la matrice. Je l'avais configuré pour que chaque processus n'écrive dans une colonne qu'une seule fois. Cependant, les résultats ne sont pas les mêmes dans les petits exemples de jouets, alors il semble que l'écriture de colonnes différentes dans la même matrice à travers la mémoire partagée ne soit pas une opération thread-safe? Je suppose qu'il y a peut-être eu un bug dans mon code. Je vais essayer d'afficher un compte rendu de cette tentative plus tard. Merci! – James

+0

@James J'ai abordé votre commentaire dans mon édition. Ma réponse inclut maintenant la sortie vers une matrice partagée. Le code fonctionne comme prévu. – cryo111