2017-10-18 16 views
2

Donc, j'ai une table dans Sql Server avec la colonne Id et c'est une colonne d'identité. Le problème auquel je suis confronté est que lorsque j'essaie d'y insérer mon cadre de données, il se plaint que identity_insert est réglé sur 'off'. Maintenant, je le mets explicitement sur 'on' en utilisant jdbc mais comme il s'agit d'une variable de session sur le serveur sql, il est remis à 'off' au moment où la commande dataframe push est atteinte, car les deux sont des sessions différentes pour sql serveur.Comment faire pour pousser la base de données Spark vers la table Sql Server lorsque set_identity est désactivé?

Y a-t-il un moyen d'activer si 'on' et de pousser l'image dans la même session?

Une partie du code - Table Sql Server

create table dbo.testtable 
(
[Id] int identity, 
[Name] varchar(100), 
[Address] varchar(100), 
[ExtraColumn] int, 
[Age] int 
) 

Mon cadre de données -

case class TestClass(Id: Int, Name: String, Address: String, ExtraColumn: 
Int, Age: Int) 

val seqClass = Seq(TestClass(1, "kv", "riata", 2, 30), 
       TestClass(2, "xyz", "xyz's place", 2, 31), 
       TestClass(3, "abc", "abc's place", 2, 32)) 

val sparkSession = createSparkSession //creating through some method 
val df = sparkSession.sqlContext.createDataFrame(seqClass) 
JDBCUtils.setIdentityInsertOn(conn, JDBC.SQL_SERVER.TYPE, 
"testdb1.dbo.testtable", None) //my method to turn on identity_insert 

//code to push data frame to sql server 
df.coalesce(1).write.mode("append").jdbc(jdbcUrl,"testdb1.dbo.testtable", 
getConnectionProperties(username,password, dbType)) 

//getConnectionProperties is my own method that provides connection 
//properties for jdbc. 

Notez que les travaux surtout bien si je supprime la colonne Id de trame de données. Donc, le code fonctionne globalement, juste que je dois être capable de maintenir Id sur la base de données et le pousser à testtable. Pourquoi je ne peux pas simplement utiliser la génération d'identité à partir de testtable? Parce que le code ci-dessus fait partie d'un flux de travail complexe et que j'ai besoin de générer des colonnes Id comme ci-dessus dans le cadre de données.

Toute aide est appréciée!

Merci

Répondre

1

Capable de résoudre ce problème après avoir atteint vers Vaibhav hors ligne qui avait déjà mis en œuvre la solution. Je poste la même chose ici pour les autres à utiliser à l'avenir.

Faire une copie locale de SaveTable() et les fonctions dépendant de JDBCUtils.java au-dessous emplacement- https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Mise à jour de la fonction SavePartition insérer ci-dessous des lignes de code -

if(Identity_Insert_Off) { 
    val sql = "set IDENTITY_INSERT " + table + " ON"; 
    val statement = conn.createStatement() 
    statement.execute(sql) 
} 

avant la ligne 640 en boucle.

while (iterator.hasNext) {...} 

mise à jour si la condition basée sur le scénario (je suis en utilisant ce code uniquement pour SqlServer afin que la vérification de l'identité d'insertion Drapeau qui se transmet à la fonction)

Ci-dessous la requête peut être utilisée pour vérifier l'identité est activée ou Désactivé pour une table particulière-

SELECT OBJECTPROPERTY(OBJECT_ID('<TableName>'), 'TableHasIdentity');