[HUDI-1636] Support Builder Pattern To Build Table Properties For HoodieTableConfig (#2596)
This commit is contained in:
@@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
|
||||
import org.apache.hudi.internal.{DataSourceInternalWriterHelper, HoodieDataSourceInternalWriter}
|
||||
import org.apache.hudi.internal.DataSourceInternalWriterHelper
|
||||
import org.apache.hudi.sync.common.AbstractSyncTool
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.SPARK_VERSION
|
||||
@@ -111,9 +111,14 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
if (!tableExists) {
|
||||
val archiveLogFolder = parameters.getOrElse(
|
||||
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
|
||||
tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||
null.asInstanceOf[String], parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
|
||||
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(tableType)
|
||||
.setTableName(tblName)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.initTable(sparkContext.hadoopConfiguration, path.get)
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
}
|
||||
|
||||
@@ -261,10 +266,15 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
if (!tableExists) {
|
||||
val archiveLogFolder = parameters.getOrElse(
|
||||
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
|
||||
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
|
||||
HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
|
||||
null, parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null),
|
||||
bootstrapIndexClass, bootstrapBasePath)
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(HoodieTableType.valueOf(tableType))
|
||||
.setTableName(tableName)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setBootstrapIndexClass(bootstrapIndexClass)
|
||||
.setBootstrapBasePath(bootstrapBasePath)
|
||||
.initTable(sparkContext.hadoopConfiguration, path)
|
||||
}
|
||||
|
||||
val jsc = new JavaSparkContext(sqlContext.sparkContext)
|
||||
|
||||
@@ -44,8 +44,11 @@ class TestStreamingSource extends StreamTest {
|
||||
test("test cow stream source") {
|
||||
withTempDir { inputDir =>
|
||||
val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
|
||||
HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath,
|
||||
COPY_ON_WRITE, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(COPY_ON_WRITE)
|
||||
.setTableName(getTableName(tablePath))
|
||||
.setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
|
||||
.initTable(spark.sessionState.newHadoopConf(), tablePath)
|
||||
|
||||
addData(tablePath, Seq(("1", "a1", "10", "000")))
|
||||
val df = spark.readStream
|
||||
@@ -91,8 +94,11 @@ class TestStreamingSource extends StreamTest {
|
||||
test("test mor stream source") {
|
||||
withTempDir { inputDir =>
|
||||
val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
|
||||
HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath,
|
||||
MERGE_ON_READ, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(MERGE_ON_READ)
|
||||
.setTableName(getTableName(tablePath))
|
||||
.setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
|
||||
.initTable(spark.sessionState.newHadoopConf(), tablePath)
|
||||
|
||||
addData(tablePath, Seq(("1", "a1", "10", "000")))
|
||||
val df = spark.readStream
|
||||
|
||||
Reference in New Issue
Block a user