1
0

[HUDI-1922] Bulk insert with row writer supports mor table (#2981)

This commit is contained in:
leesf
2021-05-26 00:40:22 +08:00
committed by GitHub
parent afa6bc0b10
commit 112732db81
2 changed files with 54 additions and 50 deletions

View File

@@ -25,14 +25,12 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@@ -95,7 +93,9 @@ public class DataSourceInternalWriterHelper {
public void createInflightCommit() { public void createInflightCommit() {
metaClient.getActiveTimeline().transitionRequestedToInflight( metaClient.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty()); new HoodieInstant(State.REQUESTED,
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()),
instantTime), Option.empty());
} }
public HoodieTable getHoodieTable() { public HoodieTable getHoodieTable() {

View File

@@ -117,56 +117,60 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
} }
} }
test("test bulk insert dataset with datasource impl") { List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
initSparkContext("test_bulk_insert_datasource") .foreach(tableType => {
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") test("test bulk insert dataset with datasource impl for " + tableType) {
try { initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val hoodieFooTableName = "hoodie_foo_tbl" val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table //create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.bulkinsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, "hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true", DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts // generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val records = DataSourceTestUtils.generateRandomRows(100) val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records) val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi // write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
// collect all parition paths to issue read of parquet files // collect all parition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
// Check the entire dataset has all records still // Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3) val fullPartitionPaths = new Array[String](3)
for (i <- 0 until fullPartitionPaths.length) { for (i <- 0 until fullPartitionPaths.length) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
}
// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df.except(trimmedDf).count() == 0)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
} }
})
// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df.except(trimmedDf).count() == 0)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
test("test insert dataset without precombine field") { test("test insert dataset without precombine field") {
val session = SparkSession.builder() val session = SparkSession.builder()
@@ -493,7 +497,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
initSparkContext("test build sync config") initSparkContext("test build sync config")
val addSqlTablePropertiesMethod = val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[Map[_,_]]) classOf[SQLConf], classOf[StructType], classOf[Map[_, _]])
addSqlTablePropertiesMethod.setAccessible(true) addSqlTablePropertiesMethod.setAccessible(true)
val schema = DataSourceTestUtils.getStructTypeExampleSchema val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -512,7 +516,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
val buildSyncConfigMethod = val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[Map[_,_]]) classOf[Map[_, _]])
buildSyncConfigMethod.setAccessible(true) buildSyncConfigMethod.setAccessible(true)
val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,