2017-10-09 5 views
0

J'essaye de construire une application avec l'API de serveur de travaux d'étincelle (pour l'étincelle 2.2.0). Mais j'ai trouvé qu'il n'y a pas de support pour namedObject avec sparkSession. mes regards comme:Pourquoi il n'y a pas de support pour sparkSession avec namedObject dans le serveur de jobs d'étincelles?

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
{ 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    this.namedObjects.update("df1", ndf) 
    this.namedObjects.getNames().toString 
} 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

} 

mais il y a erreur à la ligne this.namedObjects.update(). Je pense qu'ils n'ont pas de support pour namedObject. tandis que le même code compile avec SparkJob:

object word1 extends SparkJob with NamedObjectSupport 

Y at-il un soutien de namedObjects avec sparksession? Si ce n'est pas le cas, que faut-il faire pour conserver les données/ensembles de données?

Répondre

0

Je l'ai compris. c'était une erreur stupide de mon côté. à partir de https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/scala/spark/jobserver/NamedObjectSupport.scala#L138. comme il est dit:

// NamedObjectSupport n'est plus nécessaire en raison de JobEnvironment dans api.SparkJobBase. C'est aussi // importé dans le vieux spark.jobserver.SparkJobBase automatiquement pour la compatibilité.

@Deprecated 
trait NamedObjectSupport 

Par conséquent, pour accéder à ces fonctionnalités dont nous avons besoin de modifier ce code dans:

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
    { 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    runtime.namedObjects.update("df1", ndf) 
    runtime.namedObjects.getNames().toString 
    } 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

}