From bc883db5de5832fa429bbb04a35d3606fdacdb2a Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Fri, 5 Mar 2021 14:10:27 +0800 Subject: [PATCH] [HUDI-1636] Support Builder Pattern To Build Table Properties For HoodieTableConfig (#2596) --- .../hudi/cli/commands/TableCommand.java | 12 +- .../HoodieBackedTableMetadataWriter.java | 11 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 23 +- .../org/apache/hudi/client/TestMultiFS.java | 14 +- .../hudi/client/TestTableSchemaEvolution.java | 15 +- .../hudi/testutils/FunctionalTestHarness.java | 12 +- .../common/table/HoodieTableMetaClient.java | 276 +++++++++++------- .../timeline/TestHoodieActiveTimeline.java | 8 +- .../common/testutils/HoodieTestUtils.java | 9 +- .../java/HoodieJavaWriteClientExample.java | 7 +- .../spark/HoodieWriteClientExample.java | 7 +- .../org/apache/hudi/util/StreamerUtil.java | 16 +- .../integ/testsuite/HoodieTestSuiteJob.java | 8 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 26 +- .../hudi/functional/TestStreamingSource.scala | 14 +- .../hudi/hive/testutils/HiveTestUtil.java | 22 +- .../hudi/utilities/HDFSParquetImporter.java | 8 +- .../deltastreamer/BootstrapExecutor.java | 14 +- .../utilities/deltastreamer/DeltaSync.java | 20 +- .../TestHoodieSnapshotExporter.java | 9 +- .../testutils/UtilitiesTestBase.java | 7 +- 21 files changed, 341 insertions(+), 197 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index 168de26e6..d25e0c853 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -22,7 +22,6 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.exception.TableNotFoundException; @@ -106,10 +105,13 @@ public class TableCommand implements CommandMarker { throw new IllegalStateException("Table already existing in path : " + path); } - final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr); - HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, archiveFolder, - payloadClass, layoutVersion); - + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableTypeStr) + .setTableName(name) + .setArchiveLogFolder(archiveFolder) + .setPayloadClassName(payloadClass) + .setTimelineLayoutVersion(layoutVersion) + .initTable(HoodieCLI.conf, path); // Now connect to ensure loading works return connect(path, layoutVersion, false, 0, 0, 0); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 662941002..dbd678f2b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -288,9 +288,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); - HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataWriteConfig.getBasePath(), - HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(), - HoodieFileFormat.HFILE.toString()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.MERGE_ON_READ) + .setTableName(tableName) + .setArchiveLogFolder("archived") + .setPayloadClassName(HoodieMetadataPayload.class.getName()) + .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) + .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); + initTableMetadata(); // List all partitions in the basePath of the containing dataset diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 938385430..41ee4b208 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -349,9 +349,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) .withProps(config.getProps()).withTimelineLayoutVersion( VERSION_0).build(); - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), - metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), - metaClient.getTableConfig().getPayloadClass(), VERSION_0); + HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTimelineLayoutVersion(VERSION_0) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); + SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); // Write 1 (only inserts) @@ -493,10 +495,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() .withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion( VERSION_0).build(); + HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTimelineLayoutVersion(VERSION_0) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), - metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), - metaClient.getTableConfig().getPayloadClass(), VERSION_0); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); // Write 1 (only inserts) @@ -629,9 +632,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { .withBloomIndexUpdatePartitionPath(true) .withGlobalSimpleIndexUpdatePartitionPath(true) .build()).withTimelineLayoutVersion(VERSION_0).build(); - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), - metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), - metaClient.getTableConfig().getPayloadClass(), VERSION_0); + + HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTimelineLayoutVersion(VERSION_0) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); // Set rollback to LAZY so no inflights are deleted hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP, HoodieFailedWritesCleaningPolicy.LAZY.name()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index b0bd54f1f..83761c985 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -75,8 +75,11 @@ public class TestMultiFS extends HoodieClientTestHarness { @Test public void readLocalWriteHDFS() throws Exception { // Initialize table and filesystem - HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(hadoopConf, dfsBasePath); // Create write client to write some records in HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath); @@ -100,8 +103,11 @@ public class TestMultiFS extends HoodieClientTestHarness { assertEquals(readRecords.count(), records.size(), "Should contain 100 records"); // Write to local - HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(hadoopConf, tablePath); String writeCommitTime = localWriteClient.startCommit(); LOG.info("Starting write commit " + writeCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 9bcacc982..7065247de 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -150,9 +150,11 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { tableType = HoodieTableType.MERGE_ON_READ; // Create the table - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), - HoodieTableType.MERGE_ON_READ, metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTableType(HoodieTableType.MERGE_ON_READ) + .setTimelineLayoutVersion(VERSION_1) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); @@ -295,9 +297,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { @Test public void testCopyOnWriteTable() throws Exception { // Create the table - HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), - HoodieTableType.COPY_ON_WRITE, metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTimelineLayoutVersion(VERSION_1) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index 1db1b7f46..fc02e6d4d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -24,7 +24,6 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.config.HoodieWriteConfig; @@ -117,10 +116,13 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie @Override public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException { - props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString()); - props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); - props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name()); - props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName()); + props = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(COPY_ON_WRITE) + .setPayloadClass(HoodieAvroPayload.class) + .setBaseFileFormat(PARQUET.toString()) + .fromProperties(props) + .build(); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 8aa0a3d42..5de3b9ad0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.fs.NoOpConsistencyGuard; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; @@ -309,112 +310,6 @@ public class HoodieTableMetaClient implements Serializable { return archivedTimeline; } - /** - * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass and - * base file format. - */ - public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - String baseFileFormat, String preCombineField, String bootstrapIndexClass, - String bootstrapBasePath) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, - baseFileFormat, preCombineField, bootstrapIndexClass, bootstrapBasePath); - } - - public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - String baseFileFormat, String bootstrapIndexClass, - String bootstrapBasePath) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, - baseFileFormat, null, bootstrapIndexClass, bootstrapBasePath); - } - - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - String baseFileFormat, String preCombineField) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, preCombineField, - null, null); - } - - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - String baseFileFormat) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, null, - null, null); - } - - /** - * Used primarily by tests, examples. - */ - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String payloadClassName, String preCombineField) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, - null, preCombineField); - } - - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String payloadClassName) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, - null, (String) null); - } - - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - String preCombineField, Integer timelineLayoutVersion) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName, - timelineLayoutVersion, null, preCombineField, null, null); - } - - public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - Integer timelineLayoutVersion) throws IOException { - return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName, - timelineLayoutVersion, null, null, null, null); - } - - private static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName, String archiveLogFolder, String payloadClassName, - Integer timelineLayoutVersion, - String baseFileFormat, String preCombineField, - String bootstrapIndexClass, String bootstrapBasePath) throws IOException { - Properties properties = new Properties(); - properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); - properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); - properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME, String.valueOf(HoodieTableVersion.current().versionCode())); - if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) { - properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); - } - - if (null != archiveLogFolder) { - properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, archiveLogFolder); - } - - if (null != timelineLayoutVersion) { - properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION, String.valueOf(timelineLayoutVersion)); - } - - if (null != baseFileFormat) { - properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toUpperCase()); - } - - if (null != bootstrapIndexClass) { - properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME, bootstrapIndexClass); - } - - if (null != bootstrapBasePath) { - properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, bootstrapBasePath); - } - - if (null != preCombineField) { - properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField); - } - return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); - } - /** * Helper method to initialize a given path as a hoodie table with configs passed in as as Properties. * @@ -688,4 +583,173 @@ public class HoodieTableMetaClient implements Serializable { } } + public static PropertyBuilder withPropertyBuilder() { + return new PropertyBuilder(); + } + + public static class PropertyBuilder { + + private HoodieTableType tableType; + private String tableName; + private String archiveLogFolder; + private String payloadClassName; + private Integer timelineLayoutVersion; + private String baseFileFormat; + private String preCombineField; + private String bootstrapIndexClass; + private String bootstrapBasePath; + + private PropertyBuilder() { + + } + + public PropertyBuilder setTableType(HoodieTableType tableType) { + this.tableType = tableType; + return this; + } + + public PropertyBuilder setTableType(String tableType) { + return setTableType(HoodieTableType.valueOf(tableType)); + } + + public PropertyBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) { + this.archiveLogFolder = archiveLogFolder; + return this; + } + + public PropertyBuilder setPayloadClassName(String payloadClassName) { + this.payloadClassName = payloadClassName; + return this; + } + + public PropertyBuilder setPayloadClass(Class payloadClass) { + return setPayloadClassName(payloadClass.getName()); + } + + public PropertyBuilder setTimelineLayoutVersion(Integer timelineLayoutVersion) { + this.timelineLayoutVersion = timelineLayoutVersion; + return this; + } + + public PropertyBuilder setBaseFileFormat(String baseFileFormat) { + this.baseFileFormat = baseFileFormat; + return this; + } + + public PropertyBuilder setPreCombineField(String preCombineField) { + this.preCombineField = preCombineField; + return this; + } + + public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) { + this.bootstrapIndexClass = bootstrapIndexClass; + return this; + } + + public PropertyBuilder setBootstrapBasePath(String bootstrapBasePath) { + this.bootstrapBasePath = bootstrapBasePath; + return this; + } + + public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) { + return setTableType(metaClient.getTableType()) + .setTableName(metaClient.getTableConfig().getTableName()) + .setArchiveLogFolder(metaClient.getArchivePath()) + .setPayloadClassName(metaClient.getTableConfig().getPayloadClass()); + } + + public PropertyBuilder fromProperties(Properties properties) { + if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME)) { + setTableName(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME)) { + setTableType(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)) { + setArchiveLogFolder( + properties.getProperty(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)) { + setPayloadClassName( + properties.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION)) { + setTimelineLayoutVersion(Integer + .parseInt(properties.getProperty(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION))); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME)) { + setBaseFileFormat( + properties.getProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME)) { + setBootstrapIndexClass( + properties.getProperty(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH)) { + setBootstrapBasePath(properties.getProperty(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH)); + } + if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) { + setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)); + } + return this; + } + + public Properties build() { + ValidationUtils.checkArgument(tableType != null, "tableType is null"); + ValidationUtils.checkArgument(tableName != null, "tableName is null"); + + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME, + String.valueOf(HoodieTableVersion.current().versionCode())); + if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) { + properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); + } + + if (null != archiveLogFolder) { + properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, archiveLogFolder); + } + + if (null != timelineLayoutVersion) { + properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION, + String.valueOf(timelineLayoutVersion)); + } + + if (null != baseFileFormat) { + properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, + baseFileFormat.toUpperCase()); + } + + if (null != bootstrapIndexClass) { + properties + .put(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME, bootstrapIndexClass); + } + + if (null != bootstrapBasePath) { + properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, bootstrapBasePath); + } + + if (null != preCombineField) { + properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField); + } + return properties; + } + + /** + * Init Table with the properties build by this builder. + * + * @param configuration The hadoop config. + * @param basePath The base path for hoodie table. + */ + public HoodieTableMetaClient initTable(Configuration configuration, String basePath) + throws IOException { + return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build()); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index d80de8ee9..5c4c911e1 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -109,9 +109,11 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { "Check the instants stream"); // Backwards compatibility testing for reading compaction plans - metaClient = HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), - metaClient.getBasePath(), metaClient.getTableType(), metaClient.getTableConfig().getTableName(), - metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0); + metaClient = HoodieTableMetaClient.withPropertyBuilder() + .fromMetaClient(metaClient) + .setTimelineLayoutVersion(VERSION_0) + .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); + HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9"); byte[] dummy = new byte[5]; HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline( diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index d94f41f91..d5d5cb293 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -101,9 +101,12 @@ public class HoodieTestUtils { public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, Properties properties) throws IOException { - properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); - properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); - properties.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName()); + properties = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class) + .fromProperties(properties) + .build(); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java index 1ee5d1a09..4d06e4d15 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java @@ -72,8 +72,11 @@ public class HoodieJavaWriteClientExample { Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, hadoopConf); if (!fs.exists(path)) { - HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, tablePath); } // Create the write client to write some records in diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index b606c527b..257519b46 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -85,8 +85,11 @@ public class HoodieWriteClientExample { Path path = new Path(tablePath); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); if (!fs.exists(path)) { - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(jsc.hadoopConfiguration(), tablePath); } // Create the write client to write some records in diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 81a234a1d..cc161ce8f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -19,7 +19,6 @@ package org.apache.hudi.util; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.exception.HoodieException; @@ -284,14 +283,13 @@ public class StreamerUtil { // Hadoop FileSystem try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) { if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) { - HoodieTableMetaClient.initTableType( - hadoopConf, - basePath, - HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), - conf.getString(FlinkOptions.TABLE_NAME), - DEFAULT_ARCHIVE_LOG_FOLDER, - conf.getString(FlinkOptions.PAYLOAD_CLASS), - 1); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) + .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) + .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS)) + .setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER) + .setTimelineLayoutVersion(1) + .initTable(hadoopConf, basePath); LOG.info("Table initialized under base path {}", basePath); } else { LOG.info("Table [{}/{}] already exists, no need to initialize the table", diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index aeb8748d0..00f4d1dff 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -21,7 +21,6 @@ package org.apache.hudi.integ.testsuite; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; @@ -94,8 +93,11 @@ public class HoodieTestSuiteJob { this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props); if (!fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived"); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(cfg.tableType) + .setTableName(cfg.targetTableName) + .setArchiveLogFolder("archived") + .initTable(jsc.hadoopConfiguration(), cfg.targetBasePath); } if (cfg.cleanInput) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index ef2819101..1f1dc4d42 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,7 +38,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} -import org.apache.hudi.internal.{DataSourceInternalWriterHelper, HoodieDataSourceInternalWriter} +import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION @@ -111,9 +111,14 @@ private[hudi] object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") - val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, - tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null.asInstanceOf[String], parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) + + val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableType) + .setTableName(tblName) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY)) + .setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) + .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig } @@ -261,10 +266,15 @@ private[hudi] object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") - HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path, - HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null, parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null), - bootstrapIndexClass, bootstrapBasePath) + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.valueOf(tableType)) + .setTableName(tableName) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY)) + .setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) + .setBootstrapIndexClass(bootstrapIndexClass) + .setBootstrapBasePath(bootstrapBasePath) + .initTable(sparkContext.hadoopConfiguration, path) } val jsc = new JavaSparkContext(sqlContext.sparkContext) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala index a98152a5e..fbd0e7a1e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala @@ -44,8 +44,11 @@ class TestStreamingSource extends StreamTest { test("test cow stream source") { withTempDir { inputDir => val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream" - HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath, - COPY_ON_WRITE, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL) + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(COPY_ON_WRITE) + .setTableName(getTableName(tablePath)) + .setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL) + .initTable(spark.sessionState.newHadoopConf(), tablePath) addData(tablePath, Seq(("1", "a1", "10", "000"))) val df = spark.readStream @@ -91,8 +94,11 @@ class TestStreamingSource extends StreamTest { test("test mor stream source") { withTempDir { inputDir => val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream" - HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath, - MERGE_ON_READ, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL) + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(MERGE_ON_READ) + .setTableName(getTableName(tablePath)) + .setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL) + .initTable(spark.sessionState.newHadoopConf(), tablePath) addData(tablePath, Seq(("1", "a1", "10", "000"))) val df = spark.readStream diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 09090532b..5feca25b1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -126,8 +126,11 @@ public class HiveTestUtil { public static void clear() throws IOException { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); - HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem); for (String tableName : createdTablesSet) { @@ -161,8 +164,12 @@ public class HiveTestUtil { throws IOException, URISyntaxException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); - HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); + boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); @@ -177,8 +184,11 @@ public class HiveTestUtil { throws IOException, URISyntaxException, InterruptedException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); - HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, - hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.MERGE_ON_READ) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); boolean result = fileSystem.mkdirs(path); checkResult(result); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 06245b98a..186479541 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -135,9 +134,10 @@ public class HDFSParquetImporter implements Serializable { if (!fs.exists(new Path(cfg.targetPath))) { // Initialize target hoodie table. - Properties properties = new Properties(); - properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); - properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); + Properties properties = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(cfg.tableName) + .setTableType(cfg.tableType) + .build(); HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java index 5e34c20dc..bcf40259f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java @@ -23,7 +23,6 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; @@ -170,10 +169,15 @@ public class BootstrapExecutor implements Serializable { throw new HoodieException("target base path already exists at " + cfg.targetBasePath + ". Cannot bootstrap data on top of an existing table"); } - - HoodieTableMetaClient.initTableTypeWithBootstrap(new Configuration(jssc.hadoopConfiguration()), - cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, - cfg.baseFileFormat, cfg.bootstrapIndexClass, bootstrapBasePath); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(cfg.tableType) + .setTableName(cfg.targetTableName) + .setArchiveLogFolder("archived") + .setPayloadClassName(cfg.payloadClassName) + .setBaseFileFormat(cfg.baseFileFormat) + .setBootstrapIndexClass(cfg.bootstrapIndexClass) + .setBootstrapBasePath(bootstrapBasePath) + .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } public HoodieWriteConfig getBootstrapConfig() { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 36e2c994f..17eee8ee5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -236,8 +235,14 @@ public class DeltaSync implements Serializable { } } else { this.commitTimelineOpt = Option.empty(); - HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(cfg.tableType) + .setTableName(cfg.targetTableName) + .setArchiveLogFolder("archived") + .setPayloadClassName(cfg.payloadClassName) + .setBaseFileFormat(cfg.baseFileFormat) + .initTable(new Configuration(jssc.hadoopConfiguration()), + cfg.targetBasePath); } } @@ -321,8 +326,13 @@ public class DeltaSync implements Serializable { } } } else { - HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, - HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, cfg.baseFileFormat); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(cfg.tableType) + .setTableName(cfg.targetTableName) + .setArchiveLogFolder("archived") + .setPayloadClassName(cfg.payloadClassName) + .setBaseFileFormat(cfg.baseFileFormat) + .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java index 5f511740c..aefa49fa9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java @@ -79,9 +79,12 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { sourcePath = dfsBasePath() + "/source/"; targetPath = dfsBasePath() + "/target/"; dfs().mkdirs(new Path(sourcePath)); - HoodieTableMetaClient - .initTableType(jsc().hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, - HoodieAvroPayload.class.getName()); + + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(TABLE_NAME) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(jsc().hadoopConfiguration(), sourcePath); // Prepare data as source Hudi dataset HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index b83fa7890..946db1252 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -180,8 +180,11 @@ public class UtilitiesTestBase { // Create Dummy hive sync config HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy"); hiveConf.addResource(hiveServer.getHiveConf()); - HoodieTableMetaClient.initTableType(dfs.getConf(), hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, - hiveSyncConfig.tableName, null); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .initTable(dfs.getConf(), hiveSyncConfig.basePath); + HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, dfs); client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName); client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);