2017-09-27 3 views
1

J'utilise le Datastax spark-cassandra-connector pour accéder à certaines données dans Cassandra.Scala joinWithCassandraTable résultat (ou CassandraTableScanRDD) à l'ensemble de données

Pour pouvoir accéder efficacement à toutes les données dont j'ai besoin pour ma requête, je dois utiliser la méthode joinWithCassandraTable pour récupérer les données d'un groupe de partitions. Cela me donne un objet de classe com.datastax.spark.connector.rdd.CassandraTableScanRDD (ou similaire, pour tester, je suis en train d'utiliser la méthode standard sc.cassandraTable(ks, tbl) pour lire les données).

Le problème est que toutes les méthodes que j'ai besoin d'utiliser sur l'objet résultant nécessitent un objet de classe org.apache.spark.sql.Dataset.

J'ai fait beaucoup de recherches autour et n'ai pas été en mesure de trouver quelque chose pour aider - le plus proche que j'ai trouvé est this question similaire, que je ne pense pas avoir reçu une réponse suffisante, car il ignore la cas d'utilisation où la méthode recommandée pour accéder à toutes les données nécessaires est d'utiliser joinWithCassandraTable.

Je suis aussi nouveau à Java et Scala, donc désolé si je suis un peu lent. Toute aide serait massivement appréciée car je suis bloqué à ce point.

Merci, Akhil

Répondre

2

Ce que vous pouvez faire est de lire votre RDD dans un RDD [Ligne], puis changer cela dans une trame de données. Notre seul problème est que nous avons aussi besoin du schéma. Alors faisons cela en deux étapes.

permet d'obtenir d'abord le schéma de notre programme cible rejoindre

val schema = spark.read.cassandraFormat("dogabase", "test").load.schema 

/** 
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(owner,StringType,true), 
StructField(dog_id,IntegerType,true), 
StructField(dog_age,IntegerType,true), 
StructField(dog_name,StringType,true)) 
**/ 

nous pouvons alors faire org.apache.spark.sql.Row objets de notre Cassandra pilote lignes.

import org.apache.spark.sql.Row 
val joinResult = 
    sc.parallelize(Seq(Tuple1("Russ"))) 
    .joinWithCassandraTable("test", "dogabase") 
    .map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row 

Maintenant que nous avons un schéma et un RDD [ligne], nous pouvons utiliser la méthode createDataFrame de la session d'allumage

val dataset = spark.createDataFrame(joinResult, schema) 
dataset.show 

/** 
+-----+------+-------+--------+ 
|owner|dog_id|dog_age|dog_name| 
+-----+------+-------+--------+ 
| Russ|  1|  10| cara| 
| Russ|  2|  11|sundance| 
+-----+------+-------+--------+ 
**/ 

Et juste au cas où vous ne me croyez pas qu'un dataframe est Dataset

dataset.getClass 
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset 

EDIT: convertisseurs possibles nécessaires

Certains types Cassandra ne sont pas valides b asis pour Spark Rows vous devrez peut-être les convertir. Cela pourrait être fait en écrivant une fonction de conversion rapide. Malheureusement, la conversion intégrée que le SCC utilise fait une représentation interne de sorte que nous ne pouvons pas utiliser ces conversions.

def convertToSpark(element:Any): Any = { 
    case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date 
    case other => other 
} 

Puis lors de vos lignes

cassandraRow.columnValues.map(convertToSpark):_* 
+0

C'est fantastique et résout aussi un tas d'autres problèmes que j'ai eu ... Merci beaucoup! Je vais mettre en œuvre cela demain et vous permettra de savoir comment je me passe :) –

+0

désolé de vous déranger à nouveau - cela semble être vraiment proche de travailler, sauf un de mes champs de Cassandra est une date et je vois l'exception 'Erreur lors de l'encodage: java.lang.RuntimeException: org.joda.time.LocalDate n'est pas un type externe valide pour le schéma de date'. Tu sais si ça a une solution évidente? Merci encore –

+0

Oh Spark :) Le problème est le type retourné par le pilote cassandra "joda's localdate" n'est pas compatible avec Spark.Donc, ce que vous devez faire est de convertir ces LocalDate à un type compatible spark. Je vous suggère d'utiliser les conversions intégrées au connecteur, mais celles-ci visent une représentation interne et ne sont pas autorisées pour les sources externes. Je vais mettre un exemple de code de conversion des types dans la réponse ci-dessus. – RussS