2017-09-22 5 views
3

J'ai essayé de faire un simple modèle de régression de forêt aléatoire sur PySpark. J'ai une expérience décente de Machine Learning sur R. Cependant, pour moi, ML sur Pyspark semble complètement différent - en particulier quand il s'agit de la gestion des variables catégoriques, l'indexation des chaînes, et OneHotEncoding (Quand il n'y a que des variables numériques, j'ai pu effectuer une régression RF juste en suivant les exemples). Bien qu'il y ait beaucoup d'exemples disponibles pour manipuler des variables catégorielles, comme this et this, je n'ai eu aucun succès avec aucun d'entre eux comme la plupart d'entre eux sont allés sur ma tête (probablement à cause de ma méconnaissance de Python ML). Je serai reconnaissant à tous ceux qui peuvent aider à résoudre ce problème.Régression de forêt aléatoire pour les entrées catégorielles sur PySpark

Voici ma tentative: inputfile is here

from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer, VectorIndexer 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.sql.types import Row 
from pyspark.sql.functions import col, round 
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv') 
train.cache() 
train.dtypes 

La sortie est:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double] 

Ensuite je choisis mes variables d'intérêt:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"] 
train = train.fillna("XXX") 
train = train.select([column for column in train.columns if column in IMP]) 
from pyspark.sql.types import DoubleType 
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double")) 
train.cache() 

sortie est:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double] 

Ma variable dépendante est ConversionPayOut, précédemment un type de chaîne est maintenant converti en un type double. De là commence ma confusion: Basé sur this post, j'ai compris que je devais convertir mes variables catégoriques stringtype en vecteurs codés à un jet. Voici ma tentative que:

D'abord un StringIndexing:

`

from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer 
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ] 
pipeline = Pipeline(stages=indexers) 
train_catind = pipeline.fit(train).transform(train) 
train_catind.show() 

`

Sortie de StringIndexing:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|` 


Next, I think, I have to do the OneHOtEncoding of the String Indexes: 

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer 
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ] 
pipeline = Pipeline(stages=indexers_ON) 
train_OHE = pipeline.fit(train_catind).transform(train_catind) 
train_OHE.show() 

`

Out après un chaud encodage ressemble à ceci:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0|  (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])| 

`

Je suis désemparés quant à la manière de faire avancer . En fait, je n'ai aucune idée de ce que les paquets Spark Machine Learning exigent de nous pour faire ce codage à chaud et quels sont ceux qui ne le font pas.

Ce serait vraiment génial d'apprendre à tous les débutants de PySpark si la communauté StackOverflow pouvait clarifier la marche à suivre.

Répondre

1

Pour exécuter Random Forest sur vos données pré-traitées, vous pouvez procéder avec le code ci-dessous.

from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.classification import RandomForestClassifier 

#use VectorAssembler to combine all the feature columns into a single vector column 
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"] 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") 
pipeline = Pipeline(stages=assembler) 
df = pipeline.fit(train_OHE).transform(train_OHE) 
df = df.withColumn("label", train_OHE.ConversionPayOut) 

#randomly split data into training and test dataset 
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111) 

# train RandomForest model 
rf = RandomForestClassifier(labelCol="label", featuresCol="features") 
rf_model = rf.fit(train_data) 

# Make predictions on test data 
predictions = rf_model.transform(test_data) 


Hope this helps!

+0

Merci pour la réponse. C'était semblable à ce que j'ai essayé. Mais j'ai rencontré de nouvelles erreurs après avoir exécuté le VectorAssembler. Pouvez-vous s'il vous plaît jeter un oeil à cette question. https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa

+0

@kasa Peux-tu essayer cet extrait de code et fais-nous savoir si tu es toujours avoir la même erreur? – Prem