Je suis nouveau à étincelle et Cassandra à la fois. J'essaie d'obtenir des fonctionnalités agrégées en utilisant spark + java sur les données de Cassandra.Spark Cassandra Java intégration Problèmes
Je ne suis pas en mesure d'extraire les données Cassandra dans mon code. J'ai lu plusieurs discussions et j'ai découvert qu'il y a des problèmes de compatibilité avec les étincelles et le connecteur Sparandra. J'ai essayé beaucoup de régler mon problème mais je n'ai pas réussi à le réparer.
ci-dessous pom.xml Recherche (. Veuillez ne me dérange pas de dépendances supplémentaires aussi je dois vous assurer que la bibliothèque est l'origine du problème) -
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>IBeatCassPOC</groupId>
<artifactId>ibeatCassPOC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--CASSANDRA START-->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5.4</version>
</dependency>
<!--https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10-->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.0-M3</version>
</dependency>
<!--CASSANDRA END-->
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.2</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Spark-Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- Google Collection Library -->
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0-rc2</version>
</dependency>
<!--UA Detector dependency for AgentType in PageTrendLog-->
<dependency>
<groupId>net.sf.uadetector</groupId>
<artifactId>uadetector-core</artifactId>
<version>0.9.12</version>
</dependency>
<dependency>
<groupId>net.sf.uadetector</groupId>
<artifactId>uadetector-resources</artifactId>
<version>2013.12</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>
<!-- MongoDb Java Connector -->
<!-- <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId>
<version>2.13.0</version> </dependency> -->
</dependencies>
code Java source utilisé pour récupérer les données -
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
public class ReadCassData {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Spark-Cassandra Integration");
sparkConf.setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "stagingServer22");
sparkConf.set("spark.cassandra.connection.port", "9042");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String keySpaceName = "testKeyspace";
String tableName = "testTable";
CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);
System.out.println("Cassandra Count" + cassandraRDD.cassandraCount());
final ArrayList<CassandraRow> data = new ArrayList<CassandraRow>();
cassandraRDD.reduce(new Function2<CassandraRow, CassandraRow, CassandraRow>() {
public CassandraRow call(CassandraRow v1, CassandraRow v2) throws Exception {
System.out.println("hello");
System.out.println(v1 + " ____ " + v2);
data.add(v1);
data.add(v2);
return null;
}
});
System.out.println("data Size -" + data.size());
}
}
exception rencontrée est -
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.getMetricsSources(Ljava/lang/String;)Lscala/collection/Seq;
at org.apache.spark.metrics.MetricsUpdater$.getSource(MetricsUpdater.scala:20)
at org.apache.spark.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:56)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:329)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
J'ai un cluster Cassandra déployé sur un emplacement distant et la version Cassandra utilisée est 3.9.
Veuillez indiquer quelles sont les dépendances compatibles. Je ne peux pas changer ma version de Cassandra (actuellement 3.9). Veuillez indiquer quelle version spark/spark-cassandra-connector doit être utilisée pour exécuter avec succès des tâches de réduction de la carte sur la base de données.
Merci et salutations,
Vibhav
PS - Si quelqu'un choisit de bas-vote cette question, s'il vous plaît ne pas mentionner la raison même.
Quelle est la version d'étincelle réelle sur l'environnement que vous utilisez? – FaigB
@GaigB J'utilise spark-core-2.5.4 et spark-cassandra-connecteur -> spark-cassandra-connector_2.10 (version - 2.0.0-M3). Pouvez-vous s'il vous plaît me suggérer un exemple de référence qui collabore avec cassandra-spark et java? –