J'ai un problème lorsque j'essaie d'insérer des données dans Hbase. Je courais le code scala sur la coque Spark Google Cloud et en essayant d'insérer les données de RDD dans Hbase (BigTable)Hbase Erreur de sérialisation lors de l'insertion de données provenant de RDD
Format de hbaseRDD: - RDD [(String, Carte [String, String])]
La chaîne est un identifiant de ligne et la carte contient sa colonne correspondante et ses valeurs.
codeest comme ceci: -
val tableName: String = "omniture";
val connection = BigtableConfiguration.connect("*******", "**********")
val admin = connection.getAdmin();
val table = connection.getTable(TableName.valueOf(tableName));
TRY 1 :
hbaseRDD.foreach{w =>
val put = new Put(Bytes.toBytes(w._1));
var ColumnValue = w._2
ColumnValue.foreach{x =>
put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2));
}
table.put(put);
}
TRY 2 :
hbaseRDD.map{w =>
val put = new Put(Bytes.toBytes(w._1));
var ColumnValue = w._2
ColumnValue.map{x =>
put.addColumn(Bytes.toBytes("u"), Bytes.toBytes(x._1), Bytes.toBytes(x._2));
}
table.put(put);
}
Bellow est l'erreur que je reçois: -
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: com.google.cloud.bigtable.hbase.BigtableTable
Serialization stack:
- object not serializable (class: com.google.cloud.bigtable.hbase.BigtableTable, value: BigtableTable{hashCode=0x7d96618, project=cdp-dev-201706-01, instance=cdp-dev-cl-hbase-instance, table=omniture, host=bigtable.googleapis.com})
- field (class: logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, name: table$1, type: interface org.apache.hadoop.hbase.client.Table)
- object (class logic.ingestion.Ingestion$$anonfun$insertTransactionData$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 27 more
Toute aide serait appréciée. Merci d'avance.