2017-08-21 3 views
1

Je joue avec Machine Learning dans PySpark et j'utilise un RandomForestClassifier. J'ai utilisé Sklearn jusqu'à maintenant. J'utilise CrossValidator pour régler les paramètres et obtenir le meilleur modèle. Un exemple de code provenant du site Web de Spark est ci-dessous. D'après ce que j'ai lu, je ne comprends pas si l'étincelle distribue aussi le réglage des paramètres ou est le même que dans le cas de GridSearchCV de Sklearn.Est-ce que CrossValidator dans PySpark distribue l'exécution?

Toute aide serait vraiment appréciée.

from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
from pyspark.ml.feature import HashingTF, Tokenizer 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

# Prepare training documents, which are labeled. 
training = spark.createDataFrame([ 
    (0, "a b c d e spark", 1.0), 
    (1, "b d", 0.0), 
    (2, "spark f g h", 1.0), 
    (3, "hadoop mapreduce", 0.0), 
    (4, "b spark who", 1.0), 
    (5, "g d a y", 0.0), 
    (6, "spark fly", 1.0), 
    (7, "was mapreduce", 0.0), 
    (8, "e spark program", 1.0), 
    (9, "a e c l", 0.0), 
    (10, "spark compile", 1.0), 
    (11, "hadoop software", 0.0) 
], ["id", "text", "label"]) 

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. 
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") 
lr = LogisticRegression(maxIter=10) 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) 

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. 
# This will allow us to jointly choose parameters for all Pipeline stages. 
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. 
# We use a ParamGridBuilder to construct a grid of parameters to search over. 
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, 
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. 
paramGrid = ParamGridBuilder() \ 
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \ 
    .addGrid(lr.regParam, [0.1, 0.01]) \ 
    .build() 

crossval = CrossValidator(estimator=pipeline, 
          estimatorParamMaps=paramGrid, 
          evaluator=BinaryClassificationEvaluator(), 
          numFolds=2) # use 3+ folds in practice 

# Run cross-validation, and choose the best set of parameters. 
cvModel = crossval.fit(training) 
+0

des conseils ou si la question n'est pas claire, veuillez aviser – nEO

Répondre

2

Ce n'est pas le cas. validation croisée est mis en œuvre comme une plaine nested for loop:

for i in range(nFolds): 
    ... 
    for j in range(numModels): 
     ... 

Seul le processus de modèles individuels de formation est distribué.

0

J'ai trouvé la réponse. Comme répondu par d'autres utilisateurs, le processus n'est pas parralélisé est une opération en série. Cependant, il existe un module spark_sklearn qui pourrait être utilisé pour cette recherche de grille qui le distribue mais ne distribue pas le bâtiment du modèle. Donc c'est le compromis.

Voici le code à l'aide spark_sklearn GridSearchCV

%pyspark 

""" 
DATA - https://kdd.ics.uci.edu/databases/20newsgroups/mini_newsgroups.tar.gz 
METHOD 1 - USING GRIDSEARCH CV FROM SPARK_SKLEARN MODULE BY DATABRICKS 
DOCUMENTATION - https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-apache-spark.html 
THIS IS DISTRIBUTED OPERATION AS MENTIONED ON THE WEBSITE 
""" 
from spark_sklearn import GridSearchCV 
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer 
from pyspark.ml import Pipeline 
from pyspark.sql.types import StructField, StringType, StructType 
from pyspark.ml.feature import IndexToString, StringIndexer 
from spark_sklearn.converter import Converter 
from sklearn.pipeline import Pipeline as S_Pipeline 
from sklearn.ensemble import RandomForestClassifier as S_RandomForestClassifier 

path = 's3://sparkzepellin/mini_newsgroups//*' 
news = sc.wholeTextFiles(path) 
print "Toal number of documents = ",news.count() 

# print 5 samples 
news.takeSample(False,5, 1) 

# Using sqlContext createa dataframe 
schema = ["id", "text", "topic"] 
fields = [StructField(field_name, StringType(), True) for field in schema] 
schema = StructType(fields) 

# Applying the schema decalred above as an RDD 
newsgroups = news.map(lambda (localPath, text): (localPath.split("/")[-1], text, localPath.split("/")[-2])) 
df = sqlContext.createDataFrame(newsgroups, schema) 

df_new = StringIndexer(inputCol="topic", outputCol="label").fit(df).transform(df) 

# Build a pipeline with tokenier, hashing TF, IDF, and finally a RandomForest 
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words") 
hashingTF = HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
idf = IDF().setInputCol("rawFeatures").setOutputCol("features") 

pipeline=Pipeline(stages=[tokenizer, hashingTF, idf]) 
data = pipeline.fit(df_new).transform(df_new) 

# Using Converter, convert to pandas dataframe (numpy) 
# to run on distributed sklearn using spark_sklearn package 
converter = Converter(sc) 
new_df = Converter.toPandas(data.select(data.features.alias("text"), "label")) 

# Sklearn pipeline 
s_pipeline = S_Pipeline([ 
      ('rf', S_RandomForestClassifier()) 
     ]) 

# Random parameters 
parameters = { 
    'rf__n_estimators': (10, 20), 
    'rf__max_depth': (2, 10) 
} 

# Run GridSearchCV using the above defined parameters on the pipeline created 
gridSearch = GridSearchCV(sc, s_pipeline, parameters) 
GS = gridSearch.fit(new_df.text.values, new_df.rating.values) 

Une autre façon de le faire serait d'utiliser la méthode carte pour paralléliser l'opération et reprendre la mesure tels que la précision.

+0

Cette solution n'est pas bonne. Il met en parallèle CV mais pas l'ajustement. Vous pouvez paralléliser l'un des deux processus: apprentissage ou validation croisée. – eliasah