From 7a6272fba150fa9d7acd0b57cc4041ec49019faf Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Fri, 8 Apr 2022 14:26:40 +0800 Subject: [PATCH] [HUDI-3781] fix spark delete sql can not delete record (#5215) --- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 5 +- .../spark/sql/hudi/TestDeleteTable.scala | 47 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 74255473b..31fb0ad6c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -255,7 +255,10 @@ trait ProvidesHoodieConfig extends Logging { val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { + // operation can not be overwrite + val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key()) + + withSparkConf(sparkSession, options) { Map( "path" -> path, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index f005a14d7..9c693f962 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -151,4 +151,51 @@ class TestDeleteTable extends TestHoodieSqlBase { } } } + + test("Test Delete Table with op upsert") { + withTempDir { tmp => + Seq("cow", "mor").foreach {tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // delete data from table + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select count(1) from $tableName") ( + Seq(0) + ) + + spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000") + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(2, "a2", 10.0, 1000) + ) + + spark.sql(s"delete from $tableName") + checkAnswer(s"select count(1) from $tableName")( + Seq(0) + ) + } + } + } }