2017-04-25 1 views
3

BakgroundUtiliser Spark pour obtenir les noms de toutes les colonnes qui ont une valeur supérieure à un certain seuil

Nous décharger des données Redshift dans S3 puis le charger dans une trame de données comme ceci:

df = spark.read.csv(path, schema=schema, sep='|') 

Nous utilisons PySpark et AWS EMR (version 5.4.0) avec Spark 2.1.0.

Problème

J'ai une table de Redshift qui est lu dans PySpark au format CSV. Les enregistrements sont dans ce genre de format:

url,category1,category2,category3,category4 
http://example.com,0.6,0.0,0.9,0.3 

url est VARCHAR et la catégorievaleurs sont FLOAT entre 0,0 et 1,0. Ce que je veux faire est de générer un nouveau DataFrame avec une seule ligne par catégorie où la valeur dans l'ensemble de données original était au-dessus d'un certain seuil X. Par exemple, si le seuil était fixé à 0.5 alors je voudrais mon nouvel ensemble de données pour ressembler à ceci:

url,category 
http://example.com,category1 
http://example.com,category3 

Je suis nouveau Spark/PySpark donc je ne sais pas comment/si cela est possible de le faire toute aide serait appréciée!

EDIT:

voulu ajouter ma solution (basée sur le code de Pushkr). Nous avons une tonne de catégories à charger afin d'éviter hardcoding chaque sélection je l'ai fait les suivantes:

parsed_df = None 
for column in column_list: 
    if not parsed_df: 
     parsed_df = df.select(df.url, when(df[column]>threshold,column).otherwise('').alias('cat')) 
    else: 
     parsed_df = parsed_df.union(df.select(df.url, when(df[column]>threshold,column).otherwise(''))) 
if parsed_df is not None: 
    parsed_df = parsed_df.filter(col('cat') != '') 

column_list est une liste générée précédemment des noms de colonnes de catégorie et seuil est la valeur minimale requise pour sélectionnez la catégorie.

Merci encore!

+0

pourquoi ne pas essayer databrick paquet redshift qui déchargent et créer dataframe dans une seule instruction .. – Nitin

Répondre

1

Voici quelque chose que j'ai essayé -

data = [('http://example.com',0.6,0.0,0.9,0.3),('http://example1.com',0.6,0.0,0.9,0.3)] 

df = spark.createDataFrame(data)\ 
    .toDF('url','category1','category2','category3','category4') 

from pyspark.sql.functions import * 



df\ 
    .select(df.url,when(df.category1>0.5,'category1').otherwise('').alias('category'))\ 
    .union(\ 
    df.select(df.url,when(df.category2>0.5,'category2').otherwise('')))\ 
    .union(\ 
    df.select(df.url,when(df.category3>0.5,'category3').otherwise('')))\ 
    .union(\ 
    df.select(df.url,when(df.category4>0.5,'category4').otherwise('')))\ 
    .filter(col('category')!= '')\ 
    .show() 

sortie:

+-------------------+---------+ 
|    url| category| 
+-------------------+---------+ 
| http://example.com|category1| 
|http://example1.com|category1| 
| http://example.com|category3| 
|http://example1.com|category3| 
+-------------------+---------+ 
+0

Cela a parfaitement fonctionné! Je vous remercie! – fed0ra