1
0

[HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes

This commit is contained in:
Rahil Chertara
2021-08-22 21:55:11 -07:00
parent 486bc7dc3b
commit 694300477f
2 changed files with 64 additions and 0 deletions

View File

@@ -782,6 +782,33 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn)
}
@Test
def testHoodieIsDeletedCOW(): Unit = {
val numRecords = 100
val numRecordsToDelete = 2
val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).toList
val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
df0.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)
val snapshotDF0 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(numRecords, snapshotDF0.count())
val df1 = snapshotDF0.limit(numRecordsToDelete)
val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
df2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}
def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -783,4 +784,40 @@ class TestMORDataSource extends HoodieClientTestBase {
val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
assertEquals(true, fs.listStatus(tempPath).isEmpty)
}
@Test
def testHoodieIsDeletedMOR(): Unit = {
val numRecords = 100
val numRecordsToDelete = 2
val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
inputDF0.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
val snapshotDF0 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(numRecords, snapshotDF0.count())
val df1 = snapshotDF0.limit(numRecordsToDelete)
val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
df2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}
}