diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index b2e7134bd..0f27ee9b8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -25,14 +25,12 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.hadoop.conf.Configuration; @@ -95,7 +93,9 @@ public class DataSourceInternalWriterHelper { public void createInflightCommit() { metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty()); + new HoodieInstant(State.REQUESTED, + CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), + instantTime), Option.empty()); } public HoodieTable getHoodieTable() { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index d53b59e68..7793386dd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -117,56 +117,60 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - test("test bulk insert dataset with datasource impl") { - initSparkContext("test_bulk_insert_datasource") - val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") - try { + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .foreach(tableType => { + test("test bulk insert dataset with datasource impl for " + tableType) { + initSparkContext("test_bulk_insert_datasource") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { - val hoodieFooTableName = "hoodie_foo_tbl" + val hoodieFooTableName = "hoodie_foo_tbl" - //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", - DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType, + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) - // generate the inserts - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - val records = DataSourceTestUtils.generateRandomRows(100) - val recordsSeq = convertRowListToSeq(records) - val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(100) + val recordsSeq = convertRowListToSeq(records) + val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) - // collect all parition paths to issue read of parquet files - val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, - HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) - // Check the entire dataset has all records still - val fullPartitionPaths = new Array[String](3) - for (i <- 0 until fullPartitionPaths.length) { - fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) + // collect all parition paths to issue read of parquet files + val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) + // Check the entire dataset has all records still + val fullPartitionPaths = new Array[String](3) + for (i <- 0 until fullPartitionPaths.length) { + fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) + } + + // fetch all records from parquet files generated from write to hudi + val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + assert(df.except(trimmedDf).count() == 0) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } } - - // fetch all records from parquet files generated from write to hudi - val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) - - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - - assert(df.except(trimmedDf).count() == 0) - } finally { - spark.stop() - FileUtils.deleteDirectory(path.toFile) - } - } + }) test("test insert dataset without precombine field") { val session = SparkSession.builder() @@ -493,7 +497,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { initSparkContext("test build sync config") val addSqlTablePropertiesMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", - classOf[SQLConf], classOf[StructType], classOf[Map[_,_]]) + classOf[SQLConf], classOf[StructType], classOf[Map[_, _]]) addSqlTablePropertiesMethod.setAccessible(true) val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -512,7 +516,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val buildSyncConfigMethod = HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], - classOf[Map[_,_]]) + classOf[Map[_, _]]) buildSyncConfigMethod.setAccessible(true) val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,