diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 6137c4c63..f005a14d7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -64,4 +64,91 @@ class TestDeleteTable extends TestHoodieSqlBase { } } } + + test("Test Delete Table On Non-PK Condition") { + withTempDir { tmp => + Seq("cow", "mor").foreach {tableType => + /** non-partitioned table */ + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql( + s""" + |insert into $tableName + |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000) + """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 20.0, 1000), + Seq(3, "a2", 30.0, 1000) + ) + + // delete data on non-pk condition + spark.sql(s"delete from $tableName where name = 'a2'") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + /** partitioned table */ + val ptTableName = generateTableName + "_pt" + // create table + spark.sql( + s""" + |create table $ptTableName ( + | id int, + | name string, + | price double, + | ts long, + | pt string + |) using hudi + | location '${tmp.getCanonicalPath}/$ptTableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (pt) + """.stripMargin) + + // insert data to table + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + """.stripMargin) + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq(1, "a1", 10.0, 1000, "2021"), + Seq(2, "a2", 20.0, 1000, "2021"), + Seq(3, "a2", 30.0, 1000, "2022") + ) + + // delete data on non-pk condition + spark.sql(s"delete from $ptTableName where name = 'a2'") + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq(1, "a1", 10.0, 1000, "2021") + ) + + spark.sql(s"delete from $ptTableName where pt = '2021'") + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq.empty: _* + ) + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 82d067cfb..57c4a9729 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -27,18 +27,19 @@ class TestUpdateTable extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName ( - | ID int, - | NAME string, - | PRICE double, - | TS long + | id int, + | name string, + | price double, + | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | type = '$tableType', - | primaryKey = 'ID', - | preCombineField = 'TS' + | primaryKey = 'id', + | preCombineField = 'ts' | ) """.stripMargin) + // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") checkAnswer(s"select id, name, price, ts from $tableName")( @@ -60,9 +61,10 @@ class TestUpdateTable extends TestHoodieSqlBase { } } - test("Test ignoring case for Update Table") { + test("Test Update Table On Non-PK Condition") { withTempDir { tmp => Seq("cow", "mor").foreach {tableType => + /** non-partitioned table */ val tableName = generateTableName // create table spark.sql( @@ -80,6 +82,92 @@ class TestUpdateTable extends TestHoodieSqlBase { | preCombineField = 'ts' | ) """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 20.0, 1000) + ) + + // update data on non-pk condition + spark.sql(s"update $tableName set price = 11.0, ts = 1001 where name = 'a1'") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 20.0, 1000) + ) + + /** partitioned table */ + val ptTableName = generateTableName + "_pt" + // create table + spark.sql( + s""" + |create table $ptTableName ( + | id int, + | name string, + | price double, + | ts long, + | pt string + |) using hudi + | location '${tmp.getCanonicalPath}/$ptTableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (pt) + """.stripMargin) + + // insert data to table + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + """.stripMargin) + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq(1, "a1", 10.0, 1000, "2021"), + Seq(2, "a2", 20.0, 1000, "2021"), + Seq(3, "a2", 30.0, 1000, "2022") + ) + + // update data on non-pk condition + spark.sql(s"update $ptTableName set price = price * 1.1, ts = ts + 1 where name = 'a2'") + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq(1, "a1", 10.0, 1000, "2021"), + Seq(2, "a2", 22.0, 1001, "2021"), + Seq(3, "a2", 33.0, 1001, "2022") + ) + + spark.sql(s"update $ptTableName set price = price + 5, ts = ts + 1 where pt = '2021'") + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( + Seq(1, "a1", 15.0, 1001, "2021"), + Seq(2, "a2", 27.0, 1002, "2021"), + Seq(3, "a2", 33.0, 1001, "2022") + ) + } + } + } + + test("Test ignoring case for Update Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach {tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | ID int, + | NAME string, + | PRICE double, + | TS long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | type = '$tableType', + | primaryKey = 'ID', + | preCombineField = 'TS' + | ) + """.stripMargin) // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") checkAnswer(s"select id, name, price, ts from $tableName")(