2014-07-14 7 views
1

Je suis un utilisateur de R. Je connais très peu de choses sur la commande Linux, PuTTY ou Hadoop/Hive. Alors s'il vous plaît corrigez-moi, si j'ai tort.Utilisation de R sur un cluster Hadoop

Je travaille maintenant avec une équipe. Ils ont un système Ubuntu en cours d'exécution sur un cluster. Je peux utiliser PuTTY pour accéder à ce système Ubuntu et accéder aux fichiers de données en utilisant le code:

user$hadoop fs -ls /datafolder/ 

ou en utilisant la ruche:

user$hive 
hive>use datafolder; 
hive>show tables; 

Au contraire, l'équipe que je travaille avec sait très peu sur R, alors ils veulent que je fasse la partie R. J'ai installé R sur le cluster, et j'ai également installé rJava HRive et d'autres paquets dans R. (Je ne suis pas sûr d'avoir fait cela correctement, mais R semble fonctionner correctement).

Maintenant, je peux faire quelques tests. Je peux exécuter le code ci-dessous sur le cluster de R @:

user$R 
>install.packages(c('Rcpp', 'RJSONIO', 'bitops', 'digest', 'functional', 'stringr', 'plyr', 'reshape2','caTools')) 

>Sys.setenv(HADOOP_CMD="/opt/cloudera/bin/hadoop") 
>Sys.setenv(HADOOP_HOME="/opt/cloudera/lib/hadoop") 
>Sys.setenv(HADOOP_STREAMING="/opt/cloudera/lib/hadoop-mapreduce/hadoop-streaming.jar") 

>library(rmr2) 
>library(rhdfs) 
>hdfs.init() 

Test:

>ints = to.dfs(1:10) 
>calc = mapreduce(input = ints, map = function(k,v) cbind(v, v/2, 2*v)) 
>test <- from.dfs(calc) 
>test 

je peux charger avec succès "test" de retour à l'aide from.dfs. Il me semblait que je peux enregistrer un ensemble de données factice pour Hadoop, et peut récupérer de Hadoop avec succès (correct?)

Maintenant, ma question est, comment laisser R importer ces ensembles de données que je peux voir de

user$hadoop fs -ls /datafolder/ 

ou

>hive use datafolder; 

Répondre

1

cela est par exemple de wordcount avec un résultat de la charge de retour à R:

Sys.setenv(HADOOP_CMD="/usr/bin/hadoop") 
    Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar") 
    Sys.setenv(JAVA_HOME="/usr/java/jdk1.7.0_55-cloudera") 
    Sys.setenv(HADOOP_COMMON_LIB_NATIVE_DIR="/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native") 
    Sys.setenv(HADOOP_OPTS="-Djava.library.path=HADOOP_HOME/lib") 
    library(rhdfs) 
    hdfs.init() 
    library(rmr2) 

    ## space and word delimiter 
    map <- function(k,lines) { 
     words.list <- strsplit(lines, '\\s') 
     words <- unlist(words.list) 
     return(keyval(words, 1)) 
    } 
    reduce <- function(word, counts) { 
     keyval(word, sum(counts)) 
    } 
    wordcount <- function (input, output=NULL) { 
     mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce) 
    } 

    ## variables 
    hdfs.root <- '/user/node' 
    hdfs.data <- file.path(hdfs.root, 'data') 
    hdfs.out <- file.path(hdfs.root, 'out') 

    ## run mapreduce job 
    ##out <- wordcount(hdfs.data, hdfs.out) 
    system.time(out <- wordcount(hdfs.data, hdfs.out)) 

    ## fetch results from HDFS 
    results <- from.dfs(out) 
    results.df <- as.data.frame(results, stringsAsFactors=F) 
    colnames(results.df) <- c('word', 'count') 

    ##head(results.df) 
    ## sorted output TOP10 
    head(results.df[order(-results.df$count),],10) 
Questions connexes