J'ai une Dataframe que je veux utiliser pour la prédiction avec un modèle existant. J'obtiens une erreur lors de l'utilisation de la méthode de transformation de mon modèle.Erreur lors du transfert de données d'une trame dans un fichier ML vectorielIndexerModel
Voici comment je traite les données de formation.
forecast.printSchema()
Le schéma de mon dataframe:
root
|-- PM10: double (nullable = false)
|-- rain_3h: double (nullable = false)
|-- is_rain: double (nullable = false)
|-- wind_deg: double (nullable = false)
|-- wind_speed: double (nullable = false)
|-- humidity: double (nullable = false)
|-- is_newYear: double (nullable = false)
|-- season: double (nullable = false)
|-- is_rushHour: double (nullable = false)
|-- PM10_average: double (nullable = false)
Impression des premières lignes
forecast.show(5)
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
|PM10|rain_3h|is_rain|wind_deg|wind_speed|humidity|is_newYear|season|is_rushHour|PM10_average|
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
| 1.1| 1.0| 0.0| 15.0048| 7.27| 0.0| 0.0| 0.0| 0.0| 1.2|
+----+-------+-------+--------+----------+--------+----------+------+-----------+------------+
only showing top 5 rows
Préparation des caractéristiques
assembler = VectorAssembler(
inputCols=["rain_3h", "is_rain", "wind_deg", "wind_speed", "humidity", "is_newYear", "season", "is_rushHour", "PM10_average"],
outputCol="features")
output = assembler.transform(forecast)
output.registerTempTable("output")
features = spark.sql("SELECT features, PM10 as label FROM output")
features.printSchema()
+--------------------+-----+
| features|label|
+--------------------+-----+
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
|(9,[0,2,3,8],[1.0...| 1.1|
+--------------------+-----+
only showing top 5 rows
Et en passant les données au modèle
model = PipelineModel.load(path)
predict = model.transform(features)
predict.printSchema()
root
|-- features: vector (nullable = true)
|-- label: double (nullable = false)
|-- indexedFeatures: vector (nullable = true)
|-- prediction: double (nullable = true)
predict.show(5)
leader dans cette erreur:
17/09/16 19:12:25 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 287, in show
print(self._jdf.showString(n, truncate))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o235.showString.
: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply5_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$executeCollect$1.apply(limit.scala:132)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec$$anonfun$executeCollect$1.apply(limit.scala:132)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:132)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.NoSuchElementException: key not found: 1.0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:339)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:317)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:362)
at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:362)
... 33 more