From 7d9f9d7d8241bfb70d50c557b0194cc8a87b6ee7 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 13 Jun 2021 21:55:52 -0400 Subject: [PATCH] [HUDI-1991] Fixing drop dups exception in bulk insert row writer path (#3055) --- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 + .../HoodieSparkSqlWriterSuite.scala | 55 +++++++++++++++---- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 794ea6f98..f992a9702 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -333,6 +333,9 @@ object HoodieSparkSqlWriter { val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet") + } val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA, schema.toString) val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params)) val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index f4b1d824d..10141fb23 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -172,17 +172,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } }) - test("test insert dataset without precombine field") { - val session = SparkSession.builder() - .appName("test_insert_without_precombine") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate() + test("test drop duplicates row writing for bulk_insert") { + initSparkContext("test_append_mode") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val sqlContext = session.sqlContext - val sc = session.sparkContext + val hoodieFooTableName = "hoodie_foo_tbl" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", + INSERT_DROP_DUPS_OPT_KEY -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(100) + val recordsSeq = convertRowListToSeq(records) + val df = spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + fail("Drop duplicates with bulk insert in row writing should have thrown exception") + } catch { + case e: HoodieException => assertTrue(e.getMessage.contains("Dropping duplicates with bulk_insert in row writer path is not supported yet")) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + + test("test insert dataset without precombine field") { + initSparkContext("test_bulk_insert_datasource") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + + val sqlContext = spark.sqlContext + val sc = spark.sparkContext val hoodieFooTableName = "hoodie_foo_tbl" //create a new table @@ -201,7 +234,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) - val df = session.createDataFrame(sc.parallelize(recordsSeq), structType) + val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams - DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, df) @@ -215,7 +248,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } // fetch all records from parquet files generated from write to hudi - val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + val actualDf = spark.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) @@ -224,7 +257,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assert(df.except(trimmedDf).count() == 0) } finally { - session.stop() + spark.stop() FileUtils.deleteDirectory(path.toFile) } }