2017-06-08 2 views
1

Je souhaite créer un Spark Transformer personnalisé en Java.Créer une étincelle Transformer In Java personnalisée

Le Transformer est un préprocesseur de texte qui agit comme un Tokenizer. Il prend une colonne d'entrée et une colonne de sortie en tant que paramètres. J'ai regardé autour de moi et j'ai trouvé 2 traits de Scala HasInputCol et HasOutputCol.

Comment puis-je créer une classe qui étend Transformer et implémente HasInputCol et OutputCol?

Mon but est d'avoir quelque chose comme ça.

// Dataset that have a String column named "text" 
    DataSet<Row> dataset; 

    CustomTransformer customTransformer = new CustomTransformer(); 
    customTransformer.setInputCol("text"); 
    customTransformer.setOutputCol("result"); 

    // result that have 2 String columns named "text" and "result" 
    DataSet<Row> result = customTransformer.transform(dataset); 

Répondre

1

Vous voulez probablement hériter votre CustomTransformer de org.apache.spark.ml.UnaryTransformer. Vous pouvez essayer quelque chose comme ceci:

import org.apache.spark.ml.UnaryTransformer; 
import org.apache.spark.ml.util.Identifiable$; 
import org.apache.spark.sql.types.DataType; 
import org.apache.spark.sql.types.DataTypes; 
import scala.Function1; 
import scala.collection.JavaConversions$; 
import scala.collection.immutable.Seq; 

import java.util.Arrays; 

public class MyCustomTransformer extends UnaryTransformer<String, scala.collection.immutable.Seq<String>, MyCustomTransformer> 
{ 
    private final String uid = Identifiable$.MODULE$.randomUID("mycustom"); 

    @Override 
    public String uid() 
    { 
     return uid; 
    } 


    @Override 
    public Function1<String, scala.collection.immutable.Seq<String>> createTransformFunc() 
    { 
     // can't use labmda syntax :(
     return new scala.runtime.AbstractFunction1<String, Seq<String>>() 
     { 
      @Override 
      public Seq<String> apply(String s) 
      { 
       // do the logic 
       String[] split = s.toLowerCase().split("\\s"); 
       // convert to Scala type 
       return JavaConversions$.MODULE$.iterableAsScalaIterable(Arrays.asList(split)).toList(); 
      } 
     }; 
    } 


    @Override 
    public void validateInputType(DataType inputType) 
    { 
     super.validateInputType(inputType); 
     if (inputType != DataTypes.StringType) 
      throw new IllegalArgumentException("Input type must be string type but got " + inputType + "."); 
    } 

    @Override 
    public DataType outputDataType() 
    { 
     return DataTypes.createArrayType(DataTypes.StringType, true); // or false? depends on your data 
    } 
} 
+0

Cela ne fonctionne pas. Je suppose que c'est à cause d'un bug. Je reçois '' '' java.lang.IllegalArgumentException: l'exigence a échoué: Le paramètre d7ac3108-799c-4aed-a093-c85d12833a4e__inputCol n'appartient pas à fe3d99ba-e4eb-4e95-9412-f84188d936e3.''' – LonsomeHell

+0

@LonsomeHell, juste pour vérifier , êtes-vous sûr de l'avoir configuré avec une colonne d'entrée valide? – SergGr

+0

Oui, j'ai utilisé setInput avec un nom de colonne valide. – LonsomeHell

0

Comme SergGr suggéré, vous pouvez prolonger UnaryTransformer. Cependant, c'est assez difficile.

REMARQUE: Tous les commentaires ci-dessous s'appliquent à Spark version 2.2.0.

Pour résoudre le problème décrit dans SPARK-12606, où ils recevaient "...Param null__inputCol does not belong to...", vous devez mettre en œuvre String uid() comme ceci:

@Override 
public String uid() { 
    return getUid(); 
} 

private String getUid() { 

    if (uid == null) { 
     uid = Identifiable$.MODULE$.randomUID("mycustom"); 
    } 
    return uid; 
} 

Apparemment, ils ont été dans le uid Initialisation constructeur. Mais la chose est que inputCol (et outputCol) de UnaryTransformer est initialisé avant que uid soit initialisé dans la classe d'héritage. Voir HasInputCol:

final val inputCol: Param[String] = new Param[String](this, "inputCol", "input column name") 

Voici comment Param est construit:

def this(parent: Identifiable, name: String, doc: String) = this(parent.uid, name, doc) 

Ainsi, lorsque parent.uid est évalué, la mise en œuvre personnalisée uid() est appelée et à ce point uid est encore nulle. En implémentant uid() avec évaluation paresseuse vous vous assurez que uid() ne renvoie jamais null.

Dans votre cas si:

Param d7ac3108-799c-4aed-a093-c85d12833a4e__inputCol does not belong to fe3d99ba-e4eb-4e95-9412-f84188d936e3 

il semble être un peu différent. Parce que "d7ac3108-799c-4aed-a093-c85d12833a4e" != "fe3d99ba-e4eb-4e95-9412-f84188d936e3", il semble que votre implémentation de la méthode uid() renvoie une nouvelle valeur à chaque appel. Peut-être que dans votre cas, il a été mis en œuvre si:

@Override 
public String uid() { 
    return Identifiable$.MODULE$.randomUID("mycustom"); 
} 

Par ailleurs, lors de l'extension UnaryTransformer, assurez-vous que la fonction de transformation est Serializable.