[HUDI-1105] Adding dedup support for Bulk Insert w/ Rows (#2206)
This commit is contained in:
committed by
GitHub
parent
8f7ad8b178
commit
16e90d30ea
@@ -144,7 +144,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
// generate the inserts
|
||||
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
||||
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||
val records = DataSourceTestUtils.generateRandomRows(1000)
|
||||
val inserts = DataSourceTestUtils.generateRandomRows(1000)
|
||||
|
||||
// add some updates so that preCombine kicks in
|
||||
val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType)
|
||||
val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset)
|
||||
val records = inserts.union(updates)
|
||||
|
||||
val recordsSeq = convertRowListToSeq(records)
|
||||
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||
// write to Hudi
|
||||
@@ -161,6 +167,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
|
||||
// fetch all records from parquet files generated from write to hudi
|
||||
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
|
||||
val resultRows = actualDf.collectAsList()
|
||||
|
||||
// remove metadata columns so that expected and actual DFs can be compared as is
|
||||
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
|
||||
@@ -448,9 +455,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
||||
.foreach(tableType => {
|
||||
test("test schema evolution for " + tableType) {
|
||||
initSparkContext("test_schema_evolution")
|
||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol")
|
||||
try {
|
||||
val hoodieFooTableName = "hoodie_foo_tbl_" + tableType
|
||||
val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType
|
||||
//create a new table
|
||||
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
|
||||
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
|
||||
|
||||
Reference in New Issue
Block a user