2017-07-28 2 views
1

J'ai le tableau suivant pyspark 1.6.1:Comment puis-je intégrer/calculer des produits scalaires dans Pyspark 1.6.1?

+--------+-----+--------------------+ 
|  key|carid|    data| 
+--------+-----+--------------------+ 
| time| 1|[0.2, 0.4, 0.5, 0...| 
|velocity| 1|[2.0, 2.1, 2.3, 0...| 
| time| 2|[0.1, 0.35, 0.4, 0..| 
|velocity| 2|[1.0, 1.1, 3.3, 0...| 
| time| 3|[0.3, 0.6, 0.7, 0...| 
|velocity| 3|[2.3, 2.1, 2.3, 0...| 
+--------+-----+--------------------+ 

C'est que j'ai un certain nombre de voitures et pour chaque voiture je un tableau avec horodatages non équidistants et un tableau avec des valeurs de vitesse. Je veux calculer la distance chaque voiture a conduit.

+-----+------ -+ 
|carid|distance| 
+-----+--------+ 
| 1|  100| 
| 2|  102| 
| 3|  85| 
+-----+--------+ 

Je veux calculer cela en trapézoïdal l'intégration numérique (ou simplement scalar_product (diff (horodatage), la vitesse) Comment puis-je faire en pyspark 1.6.1 ?

Répondre

0

Pouvez-vous essayer ce code sur des données réelles et faites-nous savoir si elle a résolu votre problème?

import numpy as np 
import pyspark.sql.functions as f 
from pyspark.sql.types import FloatType 

df = sc.parallelize([ 
    ['time', 1, [0.2, 0.4, 0.5 ]], 
    ['velocity',1, [2.0, 2.1, 2.3 ]], 
    ['time', 2, [0.1, 0.35, 0.4]], 
    ['velocity',2, [1.0, 1.1, 3.3 ]] 
]).toDF(('key', 'carid', 'data')) 
df.show() 

df1 = df.sort('carid','key').groupby("carid").agg(f.collect_list("data").alias("timeVelocityPair")) 

def modify_values(l): 
    val = np.trapz(l[1], x=l[0]) 
    return float(val) 
modified_val = f.udf(modify_values, FloatType()) 
final_df = df1.withColumn("distance", modified_val("timeVelocityPair")).drop("timeVelocityPair") 
final_df.show() 
+0

Quand j'execute « DF1 = ... » Je reçois toujours le message d'erreur « Pas de gestionnaire pour Hive classe udf org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList car: seuls les arguments de type primitif sont acceptés d mais le tableau a été passé en tant que paramètre 1 ..; ".. – Mandy

+0

Pour moi, cela fonctionne, mais pour résoudre cette erreur, je suggère de suivre ce [lien] (https://stackoverflow.com/questions/38117360/aggregate-array -type-in-spark-dataframe) ou peut être mis à niveau vers la prochaine version (consultez ce [bug] (https://issues.apache.org/jira/browse/HIVE-10427)). Pour l'instant, pouvez-vous exécuter ce code sur un système autonome plutôt que sur un cluster et voir si la solution fonctionne pour vous? – Prem

+0

Merci, mais je suis à la recherche d'une solution dans pyspark 1.6.1, pas scala, et la mise à niveau n'est pas une option pour moi. – Mandy