Merge pull request #3541 from rahil-c/rahil-c/HUDI-2359
[HUDI-2359] Add basic "hoodie_is_deleted" unit tests to TestDataSource classes
This commit is contained in:
@@ -782,6 +782,33 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn)
|
assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testHoodieIsDeletedCOW(): Unit = {
|
||||||
|
val numRecords = 100
|
||||||
|
val numRecordsToDelete = 2
|
||||||
|
val records0 = recordsToStrings(dataGen.generateInserts("000", numRecords)).toList
|
||||||
|
val df0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
|
||||||
|
df0.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val snapshotDF0 = spark.read.format("org.apache.hudi")
|
||||||
|
.load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(numRecords, snapshotDF0.count())
|
||||||
|
|
||||||
|
val df1 = snapshotDF0.limit(numRecordsToDelete)
|
||||||
|
val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
|
||||||
|
val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
|
||||||
|
df2.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
val snapshotDF2 = spark.read.format("org.apache.hudi")
|
||||||
|
.load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
|
||||||
|
}
|
||||||
|
|
||||||
def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
|
def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
|
||||||
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
|
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
|
||||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
|
|||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
|
import org.apache.spark.sql.types.BooleanType
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
@@ -783,4 +784,40 @@ class TestMORDataSource extends HoodieClientTestBase {
|
|||||||
val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
||||||
assertEquals(true, fs.listStatus(tempPath).isEmpty)
|
assertEquals(true, fs.listStatus(tempPath).isEmpty)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testHoodieIsDeletedMOR(): Unit = {
|
||||||
|
val numRecords = 100
|
||||||
|
val numRecordsToDelete = 2
|
||||||
|
val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
|
||||||
|
val records0 = recordsToStrings(dataGen.generateInsertsAsPerSchema("000", numRecords, schema)).toList
|
||||||
|
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
|
||||||
|
inputDF0.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option("hoodie.compact.inline", "false")
|
||||||
|
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val snapshotDF0 = spark.read.format("org.apache.hudi")
|
||||||
|
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||||
|
.load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(numRecords, snapshotDF0.count())
|
||||||
|
|
||||||
|
val df1 = snapshotDF0.limit(numRecordsToDelete)
|
||||||
|
val dropDf = df1.drop(df1.columns.filter(_.startsWith("_hoodie_")): _*)
|
||||||
|
|
||||||
|
val df2 = dropDf.withColumn("_hoodie_is_deleted", lit(true).cast(BooleanType))
|
||||||
|
df2.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val snapshotDF2 = spark.read.format("org.apache.hudi")
|
||||||
|
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||||
|
.load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user