2017-01-31 3 views
2

Je suis nouveau à étincelle et Cassandra à la fois. J'essaie d'obtenir des fonctionnalités agrégées en utilisant spark + java sur les données de Cassandra.Spark Cassandra Java intégration Problèmes

Je ne suis pas en mesure d'extraire les données Cassandra dans mon code. J'ai lu plusieurs discussions et j'ai découvert qu'il y a des problèmes de compatibilité avec les étincelles et le connecteur Sparandra. J'ai essayé beaucoup de régler mon problème mais je n'ai pas réussi à le réparer.
ci-dessous pom.xml Recherche (. Veuillez ne me dérange pas de dépendances supplémentaires aussi je dois vous assurer que la bibliothèque est l'origine du problème) -

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>IBeatCassPOC</groupId> 
<artifactId>ibeatCassPOC</artifactId> 
<version>1.0-SNAPSHOT</version> 
<dependencies> 

    <!--CASSANDRA START--> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>3.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-mapping</artifactId> 
     <version>3.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-extras</artifactId> 
     <version>3.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.sparkjava</groupId> 
     <artifactId>spark-core</artifactId> 
     <version>2.5.4</version> 
    </dependency> 

    <!--https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10--> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>2.0.0-M3</version> 
    </dependency> 
    <!--CASSANDRA END--> 
    <!-- Kafka --> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.8.2.1</version> 
    </dependency> 

    <dependency> 
     <groupId>commons-codec</groupId> 
     <artifactId>commons-codec</artifactId> 
     <version>1.2</version> 
    </dependency> 

    <!-- Spark --> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.4.0</version> 
    </dependency> 

    <!-- Logging --> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> 
    <dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.10</artifactId> 
    <version>2.1.0</version> 
</dependency> 

    <!-- Spark-Kafka --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.4.0</version> 
    </dependency> 


    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.4.0</version> 
    </dependency> 
    <!-- Jackson --> 
    <dependency> 
     <groupId>org.codehaus.jackson</groupId> 
     <artifactId>jackson-mapper-asl</artifactId> 
     <version>1.9.13</version> 
    </dependency> 

    <!-- Google Collection Library --> 
    <dependency> 
     <groupId>com.google.collections</groupId> 
     <artifactId>google-collections</artifactId> 
     <version>1.0-rc2</version> 
    </dependency> 

    <!--UA Detector dependency for AgentType in PageTrendLog--> 
    <dependency> 
     <groupId>net.sf.uadetector</groupId> 
     <artifactId>uadetector-core</artifactId> 
     <version>0.9.12</version> 
    </dependency> 
    <dependency> 
     <groupId>net.sf.uadetector</groupId> 
     <artifactId>uadetector-resources</artifactId> 
     <version>2013.12</version> 
    </dependency> 

    <dependency> 
     <groupId>com.esotericsoftware</groupId> 
     <artifactId>kryo</artifactId> 
     <version>3.0.3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>1.3.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.twitter4j</groupId> 
     <artifactId>twitter4j-stream</artifactId> 
     <version>4.0.4</version> 
    </dependency> 

    <!-- MongoDb Java Connector --> 
    <!-- <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> 
     <version>2.13.0</version> </dependency> --> 

</dependencies> 

code Java source utilisé pour récupérer les données -

import com.datastax.spark.connector.japi.CassandraJavaUtil; 
    import com.datastax.spark.connector.japi.CassandraRow; 
    import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; 
    import org.apache.spark.SparkConf; 
    import org.apache.spark.api.java.JavaRDD; 
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.api.java.function.Function2;     
    import java.util.ArrayList; 

    public class ReadCassData { 
     public static void main(String[] args) { 

      SparkConf sparkConf = new SparkConf(); 
      sparkConf.setAppName("Spark-Cassandra Integration"); 
      sparkConf.setMaster("local[4]"); 
      sparkConf.set("spark.cassandra.connection.host", "stagingServer22"); 
      sparkConf.set("spark.cassandra.connection.port", "9042"); 

      sparkConf.set("spark.cassandra.connection.timeout_ms", "5000"); 
      sparkConf.set("spark.cassandra.read.timeout_ms", "200000"); 


      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 
      String keySpaceName = "testKeyspace"; 
      String tableName = "testTable"; 

      CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName); 
      System.out.println("Cassandra Count" + cassandraRDD.cassandraCount()); 
      final ArrayList<CassandraRow> data = new ArrayList<CassandraRow>(); 

      cassandraRDD.reduce(new Function2<CassandraRow, CassandraRow, CassandraRow>() { 
       public CassandraRow call(CassandraRow v1, CassandraRow v2) throws Exception { 
        System.out.println("hello"); 
        System.out.println(v1 + " ____ " + v2); 
        data.add(v1); 
        data.add(v2); 
        return null; 
       } 
      }); 
      System.out.println("data Size -" + data.size()); 

     } 
    } 

exception rencontrée est -

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.getMetricsSources(Ljava/lang/String;)Lscala/collection/Seq; 
     at org.apache.spark.metrics.MetricsUpdater$.getSource(MetricsUpdater.scala:20) 
     at org.apache.spark.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:56) 
     at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:329) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
     at org.apache.spark.scheduler.Task.run(Task.scala:70) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

    Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

J'ai un cluster Cassandra déployé sur un emplacement distant et la version Cassandra utilisée est 3.9.

Veuillez indiquer quelles sont les dépendances compatibles. Je ne peux pas changer ma version de Cassandra (actuellement 3.9). Veuillez indiquer quelle version spark/spark-cassandra-connector doit être utilisée pour exécuter avec succès des tâches de réduction de la carte sur la base de données.

Merci et salutations,

Vibhav

PS - Si quelqu'un choisit de bas-vote cette question, s'il vous plaît ne pas mentionner la raison même.

+0

Quelle est la version d'étincelle réelle sur l'environnement que vous utilisez? – FaigB

+0

@GaigB J'utilise spark-core-2.5.4 et spark-cassandra-connecteur -> spark-cassandra-connector_2.10 (version - 2.0.0-M3). Pouvez-vous s'il vous plaît me suggérer un exemple de référence qui collabore avec cassandra-spark et java? –

Répondre

0

Veuillez vous référer à Cassandra Spark Connector pour la version appropriée du connecteur en fonction de la version de votre étincelle dans votre environnement. Il devrait être 1.5, 1.6 ou 2.0

2

J'ai essayé avec la connexion avec l'étincelle et ai employé le connecteur de cassandra d'étincelle dans le scala.

val étincelle = "com.datastax.spark" %% "Spark-cassandra-connecteur" % "1.6.0"

val sparkCore = "org.apache.spark" %% "étincelles SQL" % "1.6.1"

Et ci-dessous est mon code de travail -

import com.datastax.driver.dse.graph.GraphResultSet 
import com.spok.util.LoggerUtil 
import com.datastax.spark.connector._ 
import org.apache.spark._ 

object DseSparkGraphFactory extends App { 

    val dseConn = {   

    LoggerUtil.info("Connecting with DSE Spark Cluster....") 
     val conf = new SparkConf(true) 
      .setMaster("local[*]") 
      .setAppName("test") 
      .set("spark.cassandra.connection.host", "Ip-Address") 
     val sc = new SparkContext(conf) 
     val rdd = sc.cassandraTable("spokg_test", "Url_p") 
     rdd.collect().map(println) 

    } 
+0

pouvez-vous s'il vous plaît spécifier quelle version de cassandra et étincelle utilisez-vous ..? D'après votre réponse, je peux remarquer que le connecteur spark-cassandra utilisé est 1.6.0 .. –

+0

oui c'est la version du connecteur spark et la version cassandra était 3.9 –

+0

Pouvez-vous également indiquer quelle est la version de Spark que vous utilisez ..? –

0

Après POM a fonctionné pour moi:

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.6.2</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>1.6.2</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.2</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.2</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 

    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
     <version>1.2.1</version> 
    </dependency> 


    <dependency> 
     <groupId>jdk.tools</groupId> 
     <artifactId>jdk.tools</artifactId> 
     <version>1.6</version> 
     <scope>system</scope> 
     <systemPath>D:\Jars\tools-1.6.0.jar</systemPath> 
    </dependency> 
</dependencies> 

Vérifiez it.I succès données de streaming intégralement ingérées de Kafka à Cassandra. De même, vous pouvez extraire des données dans javaRDD.