2017-08-29 1 views
1

Mon problème est basé sur la question similaire ici PySpark: Add a new column with a tuple created from columns, avec la différence que j'ai une liste de valeurs au lieu d'une valeur par colonne. Par exemple:Créer un tuple sur deux colonnes - PySpark

from pyspark.sql import Row 
df = sqlContext.createDataFrame([Row(v1=[u'2.0', u'1.0', u'9.0'], v2=[u'9.0', u'7.0', u'2.0']),Row(v1=[u'4.0', u'8.0', u'9.0'], v2=[u'1.0', u'1.0', u'2.0'])]) 

    +---------------+---------------+ 
    |    v1|    v2| 
    +---------------+---------------+ 
    |[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]| 
    |[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]| 
    +---------------+---------------+ 

Ce que je suis en train de faire quelque chose de similaire comme zip élément par élément pour les listes par ligne, mais je ne peux pas comprendre ce dans pyspark 1.6:

+---------------+---------------+--------------------+ 
|    v1|    v2|    v_tuple| 
+---------------+---------------+--------------------+ 
|[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]|[(2.0,9.0), (1.0,...| 
|[4.0, 8.0, 9.0]|[1.0, 1.0, 2.0]|[(4.0,1.0), (8.0,...| 
+---------------+---------------+--------------------+ 

Note: la taille des tableaux peut varier d'une rangée à l'autre, mais c'est toujours la même chose pour la même rangée dans le sens des colonnes.

Répondre

3

Si la taille des tableaux varie d'une ligne à vous aurez besoin et UDF:

from pyspark.sql.functions import udf 

@udf("array<struct<_1:double,_2:double>>") 
def zip_(xs, ys): 
    return list(zip(xs, ys)) 

df.withColumn("v_tuple", zip_("v1", "v2")) 

Spark 1.6:

from pyspark.sql.types import * 

zip_ = udf(
    lambda xs, ys: list(zip(xs, ys)), 
    ArrayType(StructType([StructField("_1", DoubleType()), StructField("_2", DoubleType())])))