J'ai essayé de faire un simple modèle de régression de forêt aléatoire sur PySpark. J'ai une expérience décente de Machine Learning sur R. Cependant, pour moi, ML sur Pyspark semble complètement différent - en particulier quand il s'agit de la gestion des variables catégoriques, l'indexation des chaînes, et OneHotEncoding (Quand il n'y a que des variables numériques, j'ai pu effectuer une régression RF juste en suivant les exemples). Bien qu'il y ait beaucoup d'exemples disponibles pour manipuler des variables catégorielles, comme this et this, je n'ai eu aucun succès avec aucun d'entre eux comme la plupart d'entre eux sont allés sur ma tête (probablement à cause de ma méconnaissance de Python ML). Je serai reconnaissant à tous ceux qui peuvent aider à résoudre ce problème.Régression de forêt aléatoire pour les entrées catégorielles sur PySpark
Voici ma tentative: inputfile is here
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes
La sortie est:
DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]
Ensuite je choisis mes variables d'intérêt:
IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()
sortie est:
DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]
Ma variable dépendante est ConversionPayOut
, précédemment un type de chaîne est maintenant converti en un type double. De là commence ma confusion: Basé sur this post, j'ai compris que je devais convertir mes variables catégoriques stringtype en vecteurs codés à un jet. Voici ma tentative que:
D'abord un StringIndexing:
`
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()
`
Sortie de StringIndexing:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|`
Next, I think, I have to do the OneHOtEncoding of the String Indexes:
`
from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()
`
Out après un chaud encodage ressemble à ceci:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0| (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])|
`
Je suis désemparés quant à la manière de faire avancer . En fait, je n'ai aucune idée de ce que les paquets Spark Machine Learning exigent de nous pour faire ce codage à chaud et quels sont ceux qui ne le font pas.
Ce serait vraiment génial d'apprendre à tous les débutants de PySpark si la communauté StackOverflow pouvait clarifier la marche à suivre.
Merci pour la réponse. C'était semblable à ce que j'ai essayé. Mais j'ai rencontré de nouvelles erreurs après avoir exécuté le VectorAssembler. Pouvez-vous s'il vous plaît jeter un oeil à cette question. https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa
@kasa Peux-tu essayer cet extrait de code et fais-nous savoir si tu es toujours avoir la même erreur? – Prem