2017-10-19 15 views
1

Je sais que Spark sait répartir le travail à effectuer sur un nouveau noeud qui est démarré en cas de panne d'un autre nœud par exemple.Spark Dataset sélectif recalculer

Je voudrais savoir si cela peut être utilisé dans d'autres cas d'utilisation.

Supposons que j'ai un arbre de transformations et d'actions. Que se passe-t-il lorsque l'un des datasets/dataframes est mis à jour (par exemple, un nouveau fichier a été importé). Dans ce cas, je voudrais répéter seulement les transformations et les actions qui sont impactées et liées à ce changement. D'autres transformations et actions non liées doivent être utilisées depuis le cache car elles n'ont pas été impactées.

Maintenant, si je n'avais que quelques-uns de ces cadres de données et des transformations et actions, je peux le faire manuellement. Mais j'ai quelques douzaines ou plus de DF et d'actions et j'essaie de comprendre si l'étincelle a quelque chose de construit dans le cadre qui peut m'aider ici.

Voici un exemple de mon code:

val carLines = spark 
    .read 
    .option("header", "true") 
    .schema(carLineSchema) 
    .csv("src/test/resources/cars") 

val ageMappingFunction: Int => String = (age: Int) => if (age > 80) "old" else "young" 
// 
val _age = udf.register("_age", ageMappingFunction) 

val personLines = spark 
    .read 
    .option("header", "true") 
    .schema(personLineSchema) 
    .csv("src/test/resources/persons") 
    .withColumn("_age", _age($"age")) 

val accidentsLines = spark 
    .read 
    .option("header", "true") 
    .schema(accidentLineSchema) 
    .csv("src/test/resources/accidents") 

val carOwners = personLines 
    .withColumnRenamed("id", "driver_id") 
    .join(carLines, Seq("driver_id"), "left") 
    .withColumnRenamed("id", "car_id") 
    .withColumnRenamed("car_make", "car_maker") 
    .withColumnRenamed("driver_id", "id") 

Voici maintenant quelques transformations:

val accidentsWithDrivers = accidentsLines 
    .join(personLines.withColumnRenamed("id", "driver_id"), "driver_id") 

val accidentsPerDriverID = accidentsWithDrivers 
    .groupBy("driver_id") 
    .agg(Map(
    "name" -> "count" 
)) 
    .withColumnRenamed("count(name)", "accident_count") 
    .withColumnRenamed("driver_id", "id") 

val finalTable = carOwners 
    .join(numberOfCarsPerDriver, Seq("id", "name", "age", "_age")) 
    .join(accidentsPerDriverID, "id") 

Alors je fais des actions (pour simplifier, je vais utiliser 'show'):

carOwners.show(true) 
numberOfCarsPerDriver.show(true) 
finalTable.show(true) 

Alors - ce que je demande est si accidentsLines a changé, mais pas carLines ou personLines. Pouvons-nous faire la transformation carOwners avec des valeurs en cache de carLines et personLines? Puis-je en quelque sorte utiliser RDD # cache() api pour survivre entre les différentes exécutions de pilote en supposant que je veux le garder en mémoire dans le cluster spark?

Répondre

1

se trouve que je dois utiliser soit job-server ou utiliser le support IgniteRDD d'Apache Ignite:

//WRITE 
val igniteContext = new IgniteContext(spark.sparkContext, "ignite-config.xml", true) 
val schema = dataframe.schema 
val rdd = dataframe.rdd 
igniteContext.ignite().getOrCreateCache("ignite-cache").put("schema", schema) 
igniteContext.fromCache(name).saveValues(rdd) 

//READ 
val schema = igniteContext.ignite() 
    .getOrCreateCache[String, StructType]("ignite-cache") 
    .get("schema") 
    .asInstanceOf[StructType] 

    val igniteRdd: IgniteRDD[String, Row] = igniteContext.fromCache(name) 
    val rdd = igniteRdd.map(a => a._2) 
    val dataframe = spark.createDataFrame(rdd, schema)