2016-07-28 1 views
5

J'ai créé un UDF simple pour convertir ou extraire des valeurs d'un champ temporel dans une étincelle temptabl in. J'inscris la fonction mais quand j'appelle la fonction using sql elle jette une exception NullPointerException. Voici ma fonction et mon processus d'exécution. J'utilise Zeppelin. Bizarrement, cela fonctionnait hier, mais cela a cessé de fonctionner ce matin.Fonction UDF Scala et Spark

Fonction

def convert(time:String) : String = { 
    val sdf = new java.text.SimpleDateFormat("HH:mm") 
    val time1 = sdf.parse(time) 
    return sdf.format(time1) 
} 

enregistrer la fonction

sqlContext.udf.register("convert",convert _) 

Test de la fonction sans SQL - Cela fonctionne

convert(12:12:12) -> returns 12:12 

Testez la fonction avec SQL dans Zeppelin ce PANNE.

%sql 
select convert(time) from temptable limit 10 

Structure de temptable

root 
|-- date: string (nullable = true) 
|-- time: string (nullable = true) 
|-- serverip: string (nullable = true) 
|-- request: string (nullable = true) 
|-- resource: string (nullable = true) 
|-- protocol: integer (nullable = true) 
|-- sourceip: string (nullable = true) 

Une partie du stacktrace que je reçois.

java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) 
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 

Répondre

7

Utilisation UDF au lieu de définir une fonction directement

import org.apache.spark.sql.functions._ 

val convert = udf[String, String](time => { 
     val sdf = new java.text.SimpleDateFormat("HH:mm") 
     val time1 = sdf.parse(time) 
     sdf.format(time1) 
    } 
) 

le paramètre d'entrée d'un FDU est la colonne (ou colonnes). Et le type de retour est la colonne.

case class UserDefinedFunction protected[sql] (
    f: AnyRef, 
    dataType: DataType, 
    inputTypes: Option[Seq[DataType]]) { 

    def apply(exprs: Column*): Column = { 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) 
    } 
}