1

Nous avons besoin de calculer la matrice de distance comme jaccard sur l'énorme collection de Dataset dans l'étincelle. Face à deux problèmes. Veuillez nous aider à donner des instructions.Utiliser la fonction de carte dans Apache Spark pour un fonctionnement énorme

Edition 1

import info.debatty.java.stringsimilarity.Jaccard; 

    //sample Data set creation 
    List<Row> data = Arrays.asList(
       RowFactory.create("Hi I heard about Spark", "Hi I Know about Spark"), 
       RowFactory.create("I wish Java could use case classes","I wish C# could use case classes"), 
       RowFactory.create("Logistic,regression,models,are,neat","Logistic,regression,models,are,neat")); 

    StructType schema = new StructType(new StructField[] {new StructField("label", DataTypes.StringType, false,Metadata.empty()), 
       new StructField("sentence", DataTypes.StringType, false,Metadata.empty()) }); 
       Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

       // Distance matrix object creation 
       Jaccard jaccard=new Jaccard(); 

       //Working on each of the member element of dataset and applying distance matrix. 
       Dataset<String> sentenceDataFrame1 =sentenceDataFrame.map(
         (MapFunction<Row, String>) row -> "Name: " + jaccard.similarity(row.getString(0),row.getString(1)),Encoders.STRING() 
       ); 
       sentenceDataFrame1.show(); 

Aucune erreur de temps de compilation. Mais faire exception temps d'exécution comme:

org.apache.spark.SparkException: Tâche non sérialisable

Numéro 2
De plus, nous devons trouver la paire est d'avoir plus de points pour lesquels nous avons besoin de déclarer certains variables Nous devons également effectuer d'autres calculs, nous sommes confrontés à beaucoup de difficultés.
Même si j'essaie de déclarer une variable simple comme compteur dans MapBlock, nous ne sommes pas en mesure de capturer la valeur incrémentée. Si nous déclarons en dehors du bloc Map, nous recevons beaucoup d'erreurs de compilation.

int counter=0; 
     Dataset<String> sentenceDataFrame1 =sentenceDataFrame.map(
       (MapFunction<Row, String>) row -> { 
        System.out.println("Name: " + row.getString(1)); 
        //int counter = 0; 
        counter++; 
        System.out.println("Counter: " + counter); 
        return counter+""; 

       },Encoders.STRING() 

     ); 

S'il vous plaît nous donne des indications. Merci.

Répondre

1

Jaccard jaccard = new Jaccard();

Cette classe est-elle sérialisable?

Dans spark, tout le code que vous écrivez à l'intérieur de Transformations est instancié sur le pilote sérialisé et envoyé aux exécuteurs.

Comme vous l'avez utiliser les fonctions lambda:

  1. toutes les classes utilisées de la classe externe à l'intérieur lambda doit être sérialisable. Si l'on utilise même une méthode de la classe externe à l'intérieur de lambda, on s'attend à ce que la classe externe soit sérialisable.

Pour avoir la compréhension des détails s'il vous plaît se référer à:

http://bytepadding.com/big-data/spark/spark-code-analysis/

http://bytepadding.com/big-data/spark/understanding-spark-serialization/

Partie 2:

  1. Essayez de trouver le produit cartésien N CROSS N à étincelle. Essayez de trouver un algorithme plus intelligent pour trouver la paire.

Plus d'entrées sur la question aidera à fournir une meilleure réponse.