0

J'ai un problème lorsque j'essaie d'insérer des données dans Hbase. Je courais le code scala sur la coque Spark Google Cloud et en essayant d'insérer les données de RDD dans Hbase (BigTable)Hbase Erreur de sérialisation lors de l'insertion de données provenant de RDD

Format de hbaseRDD: - RDD [(String, Carte [String, String])]

La chaîne est un identifiant de ligne et la carte contient sa colonne correspondante et ses valeurs.

code

est comme ceci: -

val tableName: String = "omniture"; 

val connection = BigtableConfiguration.connect("*******", "**********") 
val admin = connection.getAdmin(); 
val table = connection.getTable(TableName.valueOf(tableName)); 

TRY 1 : 
    hbaseRDD.foreach{w => 

     val put = new Put(Bytes.toBytes(w._1)); 
     var ColumnValue = w._2 

     ColumnValue.foreach{x =>  


     put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 

          } 
     table.put(put); 

     }  

TRY 2 : 
     hbaseRDD.map{w => 

     val put = new Put(Bytes.toBytes(w._1)); 
     var ColumnValue = w._2 

     ColumnValue.map{x =>  

     put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 

          } 
     table.put(put); 

     } 

Bellow est l'erreur que je reçois: -

org.apache.spark.SparkException: Task not serializable 
Caused by: java.io.NotSerializableException: com.google.cloud.bigtable.hbase.BigtableTable 
Serialization stack: 
     - object not serializable (class: com.google.cloud.bigtable.hbase.BigtableTable, value: BigtableTable{hashCode=0x7d96618, project=cdp-dev-201706-01, instance=cdp-dev-cl-hbase-instance, table=omniture, host=bigtable.googleapis.com}) 
     - field (class: logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, name: table$1, type: interface org.apache.hadoop.hbase.client.Table) 
     - object (class logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
     ... 27 more 

Toute aide serait appréciée. Merci d'avance.

Répondre

0

En ce qui concerne de: - Writing to HBase via Spark: Task not serializable

Bellow est la bonne façon de le faire: -

hbaseRDD.foreachPartition {w => 

      val tableName: String = "omniture"; 

      val connection = BigtableConfiguration.connect("cdp-dev-201706-01", "cdp-dev-cl-hbase-instance") 
      val admin = connection.getAdmin(); 

      val table = connection.getTable(TableName.valueOf(tableName)); 

      w.foreach {f=> 

      var put = new Put(Bytes.toBytes(f._1)) 

      var ColumnValue = f._2 
       ColumnValue.foreach{x =>  
         put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2)); 
           } 
      table.put(put); 
      } 

     }  

     hbaseRDD.collect(); 

Détails sont bien expliquées dans le lien ci-dessus