2017-06-09 3 views
3

Scala/Spark, je suis en train de faire ce qui suit:Comment se connecter sur un champ binaire?

val portCalls_Ports = 
    portCalls.join(ports, portCalls("port_id") === ports("id"), "inner") 

Cependant, je reçois l'erreur suivante:

Exception in thread "main" org.apache.spark.sql.AnalysisException: 
    binary type expression port_id cannot be used in join conditions; 

Il est vrai que c'est un type binaire:

root 
|-- id: binary (nullable = false) 
|-- port_id: binary (nullable = false) 
    . 
    . 
    . 

+--------------------+--------------------+ 
|     id|    port_id| 
+--------------------+--------------------+ 
|[FB 89 A0 FF AA 0...|[B2 B2 84 B9 52 2...| 

tel que ports("id").

J'utilise les bibliothèques suivantes:

scalaVersion := "2.11.11" 
libraryDependencies ++= Seq(
    // Spark dependencies 
    "org.apache.spark" %% "spark-hive" % "1.6.2", 
    "org.apache.spark" %% "spark-mllib" % "1.6.2", 
    // Third-party libraries 
    "postgresql" % "postgresql" % "9.1-901-1.jdbc4", 
    "net.sf.jopt-simple" % "jopt-simple" % "5.0.3" 
) 

Notez que j'utilise JDBC pour lire les tables de base de données.

Quelle est la meilleure façon de résoudre ce problème?

+0

expressions de type binaire peuvent être utilisés dans des conditions de jointure dans Spark 2.1.0, mais pas dans les versions antérieures. – suj1th

+1

J'ai supprimé la balise jdbc, car ce problème semble être purement interne et n'est pas lié à l'utilisation de jdbc. –

Répondre

3

Pre Spark 2.1.0, la meilleure solution que je connaisse est d'utiliser la fonction base64 pour convertir les colonnes binaires en chaînes et comparer ces:

import org.apache.spark.sql.functions._ 

val portCalls_Ports = 
    portCalls.join(ports, base64(portCalls("port_id")) === base64(ports("id")), "inner") 
+0

Désolé - édité le message pour inclure l'importation; Je recommande de prendre l'habitude d'ajouter cette importation à chaque morceau de code lié à DataFrame;) –