0

Salut les gens, j'ai une fonction qui charge un ensemble de données à partir des emplacements S3 et renvoie les données intéressantesApplication conditionnelle de `filter` /` WHERE` à une étincelle `Dataset` /` Dataframe`

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = { 
import spark.implicits._ 

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    // pick rows for the given marketplaces 
    .where($"mid".isin(mids: _*)) 
    // pick rows for the given indices 
    .where($"index".isin(indices: _*)) 

} Cette implémentation filtrera tout si quelqu'un fournit mids = Seq() ou indices = Seq(). D'autre part je voudrais que la sémantique soit "appliquer cette clause où seulement si mids n'est pas vide" (idem pour indices) de sorte qu'aucun filtrage ne se produise si l'utilisateur de la fonction fournit des séquences vides.

Existe-t-il une façon fonctionnelle de le faire?

Répondre

2

Raphael Roth est un bon choix pour le problème spécifique de l'application d'un filtre, si cela ne vous dérange pas la logique légèrement alambiquée. La solution générale qui fonctionne pour toute transformation conditionnelle (pas filtrer juste et ne pas faire tout simplement rien sur l'une des branches de décision), est d'utiliser transform, par exemple,

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    .transform { ds => 
    // pick rows for the given marketplaces 
    if (mids.isEmpty) ds 
    else ds.where($"mid".isin(mids: _*)) 
    } 
    .transform { ds => 
    // pick rows for the given indices 
    if (indices.isEmpty) ds 
    else ds.where($"index".isin(indices: _*)) 
    } 

Si vous utilisez des ensembles de données d'un type stable (ou dataframes, qui sont Dataset[Row]), transform peuvent être très utiles que vous pouvez construire des séquences de fonctions de transformation et les appliquer:

transformations.foldLeft(ds)(_ transform _) 

Dans de nombreux cas, cette approche aide à la réutilisation du code et la testabilité.

+1

Fonctionne pour moi merci! –

1

vous pouvez utiliser l'évaluation de court-circuit, cela ne devrait appliquer le filtre si le fournit Seq s ne sont pas vides: la réponse de

import org.apache.spark.sql.functions.lit 

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    // pick rows for the given marketplaces 
    .where(lit(mids.isEmpty) or $"mid".isin(mids: _*)) 
    // pick rows for the given indices 
    .where(lit(indices.isEmpty) or $"index".isin(indices: _*)) 
+0

Merci Raphael, votre solution fonctionne. Je l'ai upvoted! Je vais choisir la réponse de Sim comme la réponse à cette question mais pour sa généralité et sa logique plus facile. –