0

J'essaie d'extraire les données de Cassandra 3.9 en utilisant SparkCassandraConnector. J'ai plusieurs Jobs Spark (1.6) qui utilise les mêmes données. Donc, je l'ai mis en cache en utilisant le code suivant. enter image description hereErreur lors de l'extraction des blocs d'un autre noeud d'amorçage lors de la mise en cache des Rdds

Code Spark:

sc.parallelize(partitions, 2*sc.defaultParallelism).map(x => new Partition(x)).joinWithCassandraTable("KEYSPACE","COLUMNFAMILY").on(SomeColumns("partitionkey")).select("partitionkey", "cookie", "query").cache() 

Mais peu de tâches s'échoué et plaids l'exception suivante:

org.apache.spark.storage.BlockFetchException: Failed to fetch block from 1 locations. Most recent failure cause: 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) 
     at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) 
     at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) 
     at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) 
     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
    Caused by: java.io.IOException: Connection from ubuntu/172.16.0.27:56727 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     ... 1 more 

enter image description here

Où que je ne reçois pas d'exception alors que nous ne mettons pas en cache les données. De plus, chaque nœud contient le mappage pour les adresses IP respectives ubuntu et ubuntu1 dans leur fichier hôte.

En outre, comme mentionné dans la capture d'écran, il a divisé l'ensemble des données 8 parité. Connecteur SparkCassandra aurait dû distribuer les emplois intelligously, mais pourquoi Locality_Level il montre est ANY Ce qui implique qu'il n'a pas pu trouver les données sur le même nœud, pourquoi?

Répondre

0

Ce problème a été résolu pour la nouvelle version:

une erreur qui se produit en raison de l'échec de récupérations des blocs non aléatoire (tels que des émissions ou des blocs de RDD mises en cache).

https://issues.apache.org/jira/browse/SPARK-14209 
https://issues.apache.org/jira/browse/SPARK-17484