1
0

[HUDI-1991] Fixing drop dups exception in bulk insert row writer path (#3055)

This commit is contained in:
Sivabalan Narayanan
2021-06-13 21:55:52 -04:00
committed by GitHub
parent 6e78682cea
commit 7d9f9d7d82
2 changed files with 47 additions and 11 deletions

View File

@@ -333,6 +333,9 @@ object HoodieSparkSqlWriter {
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)

View File

@@ -172,17 +172,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})
test("test insert dataset without precombine field") {
val session = SparkSession.builder()
.appName("test_insert_without_precombine")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
test("test drop duplicates row writing for bulk_insert") {
initSparkContext("test_append_mode")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val sqlContext = session.sqlContext
val sc = session.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
INSERT_DROP_DUPS_OPT_KEY -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
fail("Drop duplicates with bulk insert in row writing should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet"))
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
test("test insert dataset without precombine field") {
initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val sqlContext = spark.sqlContext
val sc = spark.sparkContext
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
@@ -201,7 +234,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df)
@@ -215,7 +248,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
// fetch all records from parquet files generated from write to hudi
val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
val actualDf = spark.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -224,7 +257,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
assert(df.except(trimmedDf).count() == 0)
} finally {
session.stop()
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}