2017-09-25 5 views
1

J'essaie d'utiliser spark_apply pour exécuter la fonction R ci-dessous sur une table Spark. Cela fonctionne bien si ma table d'entrée est faible (par exemple 5000 lignes), mais après ~ 30 minutes renvoie une erreur lorsque la table est modérément importante (par exemple 5.000.000 lignes): sparklyr worker rscript failure, check worker logs for detailsLa fonction spark_apply de Sparklyr semble fonctionner sur un seul exécuteur et échoue sur un ensemble de données modérément grand

En regardant l'interface utilisateur Spark montre qu'il ya seulement une seule tâche est créée et un seul exécuteur est appliqué à cette tâche.

Quelqu'un peut-il donner des conseils sur les raisons pour lesquelles cette fonction échoue pour un ensemble de données de 5 millions de lignes? Le problème pourrait-il être qu'un exécuteur unique est fait pour faire tout le travail, et échouer?

# Create data and copy to Spark 
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table 
       string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000)) 
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE) 

# Write function to return dataframe with strings split out 
myFunction <- function(inputdf){ 
    inputdf$string_categories <- as.character(inputdf$string_categories) 
    inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) 
    stringCategoriesList <- strsplit(inputdf$string_categories, ' ') 
    outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), 
        string_categories=unlist(stringCategoriesList)) 
return(outDF) 
} 

# Use spark_apply to run function in Spark 
outtbl <- testtbl %>% 
    spark_apply(myFunction, 
      names=c('string_id', 'string_categories')) 
outtbl 

Répondre

2
  1. L'erreur sparklyr worker rscript failure, check worker logs for details est écrit par le noeud du conducteur et signale que l'erreur réelle doit être trouvée dans les journaux des travailleurs. Habituellement, les journaux des travailleurs peuvent être consultés en ouvrant stdout à partir de l'onglet de l'exécuteur dans l'interface utilisateur Spark; les journaux doivent contenir RScript: entrées décrivant ce que l'exécuteur traite et le spécifique de l'erreur.

  2. En ce qui concerne la seule tâche en cours de création, lorsque columns ne sont pas spécifiés avec des types en spark_apply(), il faut calculer un sous-ensemble du résultat de deviner les types de colonnes, pour éviter cela, passer les types de colonnes explicites comme suit:

    outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

  3. Si vous utilisez sparklyr 0.6.3, mise à jour ou sparklyr 0.6.4devtools::install_github("rstudio/sparklyr"), puisque sparklyr 0.6.3 contient un temps d'attente incorrect dans certains cas limites où la distribution des paquets est activé et plus d'un exécuteur testamentaire exécute dans chaque nœud.

  4. Sous une charge élevée, il est courant de manquer de mémoire. L'augmentation du nombre de partitions pourrait résoudre ce problème car cela réduirait la quantité totale de mémoire requise pour traiter ce jeu de données. Essayez d'exécuter ce que:

    testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

  5. Il pourrait également être le cas que la fonction renvoie une exception pour quelques-unes des partitions en raison de la logique de la fonction, vous pouvez voir si tel est le cas en utilisant tryCatch() ignorer les erreurs, puis trouver quelles sont les valeurs manquantes et pourquoi la fonction échouerait pour ces valeurs. Je voudrais commencer par quelque chose comme:

    myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

+0

Merci pour cette réponse complète! L'augmentation du nombre de partitions a résolu le problème, mais il y a aussi beaucoup d'informations supplémentaires qui me permettront d'aller de l'avant. – jay