2

Nous avons créé deux ensemble de données phraseDataFrame, phraseDataFrame2 où search replace doit se produire. PhraseDataFrame2 stocke les termes de recherche et de remplacement.Rechercher et remplacer dans Apache Spark

Nous avons également effectué tous les 11 types de jointure «interne», «externe», «complet», «fullouter», «leftouter», «gauche», «rightouter», «droite», «leftsemi», «leftanti» ',' croiser 'aucun d'entre eux nous a donné le résultat.

Pouvez-vous s'il vous plaît laissez-nous savoir Où nous allons mal et veuillez nous diriger dans la bonne direction.

 List<Row> data = Arrays.asList(
      RowFactory.create(0, "Allen jeevi pramod Allen"), 
      RowFactory.create(1,"sandesh Armstrong jeevi"), 
      RowFactory.create(2,"harsha Nischay DeWALT")); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
      Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 


     List<Row> data2 = Arrays.asList(
      RowFactory.create("Allen", "Apex Tool Group"), 
      RowFactory.create("Armstrong","Apex Tool Group"), 
      RowFactory.create("DeWALT","StanleyBlack")); 

     StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, 
      Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, 
      Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

     Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross"); 
     System.out.println("Left anti join count :"+remainingElements.count()); 

Entrée

Allen de Pramod Allen
Sandesh Armstrong
Harsha Nischay DeWALT

attendu Sortie

Apex Tool Group Jivi Pramod Tool Group Apex
Sandesh Apex Tool Group Jivi
Harsha Nischay StanleyBlack

Répondre

3

Pour se joindre à des conditions qui ne nécessitent pas égalités simples comme celui-ci, vous allez avoir besoin d'utiliser Spark fonctions définies par l'utilisateur (UDF).

Voici un extrait de code JUnit qui ne sera pas compilé directement, mais affichera les importations et la logique pertinentes. L'API Java est assez verbeuse, cependant. Je vais laisser le problème de faire cela dans Scala comme un exercice pour le lecteur. Ce sera beaucoup plus concis. L'importation statique est requise pour les méthodes callUDF() et col().

import static org.apache.spark.sql.functions.*; 

import org.apache.spark.sql.*; 
import org.apache.spark.sql.api.java.UDF2; 
import org.apache.spark.sql.api.java.UDF3; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.Metadata; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

@Test 
public void testSomething() { 
    List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

    StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

    List<Row> data2 = Arrays.asList(
     RowFactory.create("Allen", "Apex Tool Group"), 
     RowFactory.create("Armstrong","Apex Tool Group"), 
     RowFactory.create("DeWALT","StanleyBlack") 
    ); 

    StructType schema2 = new StructType(new StructField[] { 
     new StructField("label2", DataTypes.StringType, false, Metadata.empty()), 
     new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
    }); 
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

    UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() { 
     private static final long serialVersionUID = -5239951370238629896L; 

     @Override 
     public Boolean call(String t1, String t2) throws Exception { 
      return t1.contains(t2); 
     } 
    }; 
    spark.udf().register("contains", contains, DataTypes.BooleanType); 

    UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() { 
     private static final long serialVersionUID = -2882956931420910207L; 

     @Override 
     public String call(String t1, String t2, String t3) throws Exception { 
      return t1.replaceAll(t2, t3); 
     } 
    }; 
    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType); 

    Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"))) 
              .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2"))) 
              .select(col("sentence_replaced")); 

    joined.show(false); 
} 

Sortie:

+--------------------------------------------+ 
|sentence_replaced       | 
+--------------------------------------------+ 
|Apex Tool Group jeevi pramod Apex Tool Group| 
|sandesh Apex Tool Group jeevi    | 
|harsha Nischay StanleyBlack     | 
+--------------------------------------------+ 
+0

Merci @Ivan Gozali cela a fonctionné parfaitement. – Nischay

2

toujours face à un problème similaire

Entrée

Allen Armstrong de Pramod Allen
Sandesh Armstrong Jivi
Harsha nischay DeWALT

Sortie

outil Apex Group Armstrong Pramod outil Apex Group
Allen Apex Tool Group de Pramod Allen
Group Sandesh Apex Tool Jivi
Harsha nischay StanleyBlack

Sortie prévue

Outil Apex Apex Tool Group Groupe pramod Apex Tool Group
groupe sandesh Apex Tool Jivi
harsha nischay StanleyBlack

A obtenu cette sortie quand il y a plusieurs remplacements font dans une rangée.

Existe-t-il une autre méthode à suivre pour obtenir la bonne sortie? Ou est-ce la limitation de UDF?

5

Nous pouvons utiliser les fonctions replaceAll et UDF pour obtenir la sortie attendue.

public class Test { 

    public static void main(String[] args) { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
     SQLContext sqlContext = new SQLContext(sc); 
     SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate(); 

     List<Row> data = Arrays.asList(
     RowFactory.create(0, "Allen jeevi pramod Allen"), 
     RowFactory.create(1, "sandesh Armstrong jeevi"), 
     RowFactory.create(2, "harsha Nischay DeWALT") 
    ); 

     StructType schema = new StructType(new StructField[] { 
     new StructField("label", DataTypes.IntegerType, false, 
       Metadata.empty()), 
     new StructField("sentence", DataTypes.StringType, false, 
       Metadata.empty()) }); 
     Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 
     UDF1 mode = new UDF1<String, String>() { 
      public String call(final String types) throws Exception { 
       return types.replaceAll("Allen", "Apex Tool Group") 
       .replaceAll("Armstrong","Apex Tool Group") 
       .replaceAll(""DeWALT","StanleyBlack"") 
      } 
     }; 

     sqlContext.udf().register("mode", mode, DataTypes.StringType); 

     sentenceDataFrame.createOrReplaceTempView("people"); 
     Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence"); 
     newDF.show(false); 
} 
} 

sortie

+--------------------------------------------+------+ 
    |sentence         |label | 
    +--------------------------------------------+------+ 
    |Apex Tool Group jeevi pramod Apex Tool Group| 0 | 
    |sandesh Apex Tool Group jeevi    | 1 | 
    |harsha Nischay StanleyBlack     | 2 | 
    +--------------------------------------------+------+