2010-09-10 3 views
35

J'ai une simulation qui a un grand agrégat et une étape de combinaison au milieu. J'ai prototypé ce processus en utilisant la fonction ddply() de plyr qui fonctionne parfaitement pour un pourcentage énorme de mes besoins. Mais j'ai besoin que cette étape d'agrégation soit plus rapide puisque je dois faire des simulations 10K. Je suis déjà en train de mettre à l'échelle les simulations en parallèle, mais si cette étape était plus rapide, je pourrais grandement diminuer le nombre de nœuds dont j'ai besoin.R: accélérer les opérations "group by"

est ici une simplification raisonnable de ce que je suis en train de faire:

library(Hmisc) 

# Set up some example data 
year <- sample(1970:2008, 1e6, rep=T) 
state <- sample(1:50, 1e6, rep=T) 
group1 <- sample(1:6, 1e6, rep=T) 
group2 <- sample(1:3, 1e6, rep=T) 
myFact <- rnorm(100, 15, 1e6) 
weights <- rnorm(1e6) 
myDF <- data.frame(year, state, group1, group2, myFact, weights) 

# this is the step I want to make faster 
system.time(aggregateDF <- ddply(myDF, c("year", "state", "group1", "group2"), 
        function(df) wtd.mean(df$myFact, weights=df$weights) 
           ) 
      ) 

Tous les conseils ou suggestions sont appréciés!

+1

Non lié à la performance, mais checkout 'weighted.mean' dans la base – hadley

+1

Oh, c'est pratique. Vous pouvez voir que j'ai appris R en recherchant pour ce que j'ai besoin de faire;) –

Répondre

37

Au lieu de la trame de données normale R, vous pouvez utiliser une trame de données immuable qui renvoie des pointeurs vers l'original lorsque vous sous-ensemble et peut être beaucoup plus rapide:

idf <- idata.frame(myDF) 
system.time(aggregateDF <- ddply(idf, c("year", "state", "group1", "group2"), 
    function(df) wtd.mean(df$myFact, weights=df$weights))) 

# user system elapsed 
# 18.032 0.416 19.250 

Si je devais écrire un fonction plyr sur mesure exactement à cette situation, je ferais quelque chose comme ceci:

system.time({ 
    ids <- id(myDF[c("year", "state", "group1", "group2")], drop = TRUE) 
    data <- as.matrix(myDF[c("myFact", "weights")]) 
    indices <- plyr:::split_indices(seq_len(nrow(data)), ids, n = attr(ids, "n")) 

    fun <- function(rows) { 
    weighted.mean(data[rows, 1], data[rows, 2]) 
    } 
    values <- vapply(indices, fun, numeric(1)) 

    labels <- myDF[match(seq_len(attr(ids, "n")), ids), 
    c("year", "state", "group1", "group2")] 
    aggregateDF <- cbind(labels, values) 
}) 

# user system elapsed 
# 2.04 0.29 2.33 

il est tellement plus rapide car il évite la copie des données, extraire uniquement le sous-ensemble nécessaire pour chaque calcul quand c'est calculé. La commutation des données en forme de matrice donne un autre coup de pouce de vitesse car la sous-matrice est beaucoup plus rapide que la sous-trame de trame de données.

+3

'idata.frame' a été ajouté dans plyr 1.0. – hadley

+0

Je me suis trompé avec la création d'index et autres avec data.table et avais tout abandonné sur cette idée. J'espérais une amélioration de 50%. Cela dépasse de loin mes attentes. –

+0

avoir un peu de mal à faire ce droit d'exécution ... Mais j'apprends comme je vais ... j'avais changé les données à myDF, mais je ne sais pas où le problème est .. –

7

Utilisez-vous la dernière version de plyr (note: cela n'a pas encore été fait pour tous les miroirs CRAN)? Si c'est le cas, vous pouvez simplement exécuter cela en parallèle.

Voici l'exemple llply, mais la même chose devrait appliquer à ddply:

x <- seq_len(20) 
    wait <- function(i) Sys.sleep(0.1) 
    system.time(llply(x, wait)) 
    # user system elapsed 
    # 0.007 0.005 2.005 

    library(doMC) 
    registerDoMC(2) 
    system.time(llply(x, wait, .parallel = TRUE)) 
    # user system elapsed 
    # 0.020 0.011 1.038 

Edit:

Eh bien, d'autres approches looping sont pires, donc cela exige probablement soit (a) C/Code C++ ou (b) une réflexion plus fondamentale sur la façon dont vous le faites. Je n'ai même pas essayé d'utiliser by() parce que c'est très lent dans mon expérience.

groups <- unique(myDF[,c("year", "state", "group1", "group2")]) 
system.time(
aggregateDF <- do.call("rbind", lapply(1:nrow(groups), function(i) { 
    df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],] 
    cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights)) 
})) 
) 

aggregateDF <- data.frame() 
system.time(
for(i in 1:nrow(groups)) { 
    df.tmp <- myDF[myDF$year==groups[i,"year"] & myDF$state==groups[i,"state"] & myDF$group1==groups[i,"group1"] & myDF$group2==groups[i,"group2"],] 
    aggregateDF <- rbind(aggregateDF, data.frame(cbind(groups[i,], wtd.mean(df.tmp$myFact, weights=df.tmp$weights)))) 
} 
) 
+0

qui m'aide dans le cas de la machine unique, mais je suis déjà en train de souffler à Hadoop et de surcharger chaque nœud (plus de processus que les processeurs). Mais je suis très heureux de voir la parallélisation en faire un plyr! –

8

je serais le profil avec la base R

g <- with(myDF, paste(year, state, group1, group2)) 
x <- with(myDF, c(tapply(weights * myFact, g, sum)/tapply(weights, g, sum))) 
aggregateDF <- myDF[match(names(x), g), c("year", "state", "group1", "group2")] 
aggregateDF$V1 <- x 

Sur ma machine, il faut comparer à 5sec 67sec avec le code d'origine.

EDIT Je viens de trouver une autre vitesse avec fonction rowsum:

g <- with(myDF, paste(year, state, group1, group2)) 
X <- with(myDF, rowsum(data.frame(a=weights*myFact, b=weights), g)) 
x <- X$a/X$b 
aggregateDF2 <- myDF[match(rownames(X), g), c("year", "state", "group1", "group2")] 
aggregateDF2$V1 <- x 

Il faut 3s!

+2

La seconde prend 5 secondes sur mon ordinateur, si plyr est toujours en train de battre la base;) (Plus il ordonne les lignes correctement) – hadley

+2

Mais merci pour le pointeur vers 'rowsum' - il est si difficile de suivre la pléthore de fonctions d'agrégation dans la base R. – hadley

+0

Il fallait que ce soit aussi un moyen de le faire, mais j'avais du mal à le comprendre. J'ai généralement cette lutte avec la famille d'application. –

5

J'utilise habituellement un vecteur d'index avec tapply lorsque la fonction appliquée a plusieurs vecteurs args:

system.time(tapply(1:nrow(myDF), myDF[c('year', 'state', 'group1', 'group2')], function(s) weighted.mean(myDF$myFact[s], myDF$weights[s]))) 
# user system elapsed 
# 1.36 0.08 1.44 

J'utilise un emballage simple qui est équivalent, mais cache le désordre:

tmapply(list(myDF$myFact, myDF$weights), myDF[c('year', 'state', 'group1', 'group2')], weighted.mean) 

Édité pour inclure le tmapply pour commentaire ci-dessous:

tmapply = function(XS, INDEX, FUN, ..., simplify=T) { 
    FUN = match.fun(FUN) 
    if (!is.list(XS)) 
    XS = list(XS) 
    tapply(1:length(XS[[1L]]), INDEX, function(s, ...) 
    do.call(FUN, c(lapply(XS, `[`, s), list(...))), ..., simplify=simplify) 
} 
+0

c'est très chouette de voir ça fait en base R. Merci! –

+1

Juste pour ajouter: 'as.data.frame (as.table (RESULTS))' c'est un moyen facile de créer 'data.frame' depuis la sortie. – Marek

+0

Est-ce le 'tmapply' que vous utilisez? https://stat.ethz.ch/pipermail/r-help/2002-October/025773.html – Shane

25

plus 2x et speedup code plus concis:

library(data.table) 
dtb <- data.table(myDF, key="year,state,group1,group2") 
system.time( 
    res <- dtb[, weighted.mean(myFact, weights), by=list(year, state, group1, group2)] 
) 
# user system elapsed 
# 0.950 0.050 1.007 

Mon premier post, donc s'il vous plaît être gentil;)


De data.table fonction v1.9.2, setDT est exportée qui va convertir data.frame à data.tablepar référence (en accord avec data.table jargon - toutes les fonctions set* modifient l'objet par référence). Cela signifie, pas de copie inutile, et est donc rapide. Vous pouvez le chronométrer, mais ce sera négligent.

require(data.table) 
system.time({ 
    setDT(myDF) 
    res <- myDF[, weighted.mean(myFact, weights), 
      by=list(year, state, group1, group2)] 
}) 
# user system elapsed 
# 0.970 0.024 1.015 

Ceci est aussi opposé à 1.264 secondes avec la solution OP ci-dessus, où data.table(.) est utilisé pour créer dtb.

+0

Bon article! Merci d'avoir répondu. Pour être cohérent avec les autres méthodes, cependant, l'étape qui crée la table de données et l'index doit être à l'intérieur de l'étape system.time(). –

+2

En effet, mais il reste le plus rapide cependant. Il serait bien d'avoir une option pour opérer sur data.tables ou utiliser data.tables sous le capot (je viens de découvrir data.table en cherchant des solutions au même problème, mais je préférerais un ddply-like syntaxe pour ce cas). – datasmurf