From c9fcf964b2bae56a54cb72951c8d8999eb323ed6 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 19 Feb 2021 20:54:26 -0500 Subject: [PATCH] [HUDI-1315] Adding builder for HoodieTableMetaClient initialization (#2534) --- .../java/org/apache/hudi/cli/HoodieCLI.java | 4 +- .../hudi/cli/commands/CommitsCommand.java | 4 +- .../cli/commands/FileSystemViewCommand.java | 2 +- .../apache/hudi/cli/commands/SparkMain.java | 6 +- .../hudi/cli/commands/TableCommand.java | 2 +- .../org/apache/hudi/cli/DedupeSparkJob.scala | 4 +- .../hudi/client/AbstractHoodieClient.java | 6 +- .../HoodieBackedTableMetadataWriter.java | 2 +- .../apache/hudi/table/HoodieFlinkTable.java | 11 +-- .../apache/hudi/table/HoodieJavaTable.java | 11 +-- .../apache/hudi/client/HoodieReadClient.java | 4 +- .../apache/hudi/table/HoodieSparkTable.java | 11 +-- .../client/TestCompactionAdminClient.java | 12 +-- .../TestHoodieClientOnCopyOnWriteStorage.java | 12 +-- .../org/apache/hudi/client/TestMultiFS.java | 4 +- .../metadata/TestHoodieBackedMetadata.java | 10 +-- .../action/compact/CompactionTestBase.java | 16 ++-- .../action/compact/TestAsyncCompaction.java | 26 +++--- .../action/compact/TestInlineCompaction.java | 36 ++++---- .../hudi/testutils/HoodieClientTestBase.java | 8 +- .../testutils/HoodieClientTestHarness.java | 2 +- .../hudi/testutils/HoodieClientTestUtils.java | 2 +- .../testutils/HoodieMergeOnReadTestUtils.java | 2 +- .../common/table/HoodieTableMetaClient.java | 83 +++++++++++++------ .../log/AbstractHoodieLogRecordScanner.java | 2 +- .../hudi/common/table/log/LogReaderUtils.java | 2 +- .../table/view/FileSystemViewManager.java | 2 +- .../hudi/metadata/BaseTableMetadata.java | 2 +- .../metadata/HoodieBackedTableMetadata.java | 2 +- .../timeline/TestHoodieActiveTimeline.java | 7 +- .../table/view/TestIncrementalFSViewSync.java | 10 +-- .../common/testutils/CompactionTestUtils.java | 2 +- .../common/testutils/FileCreateUtils.java | 2 +- .../testutils/HoodieCommonTestHarness.java | 2 +- .../hudi/common/util/TestCompactionUtils.java | 4 +- .../hudi/hadoop/HoodieROTablePathFilter.java | 2 +- .../AbstractRealtimeRecordReader.java | 2 +- .../hadoop/utils/HoodieInputFormatUtils.java | 2 +- .../testsuite/dag/nodes/CompactNode.java | 5 +- .../testsuite/dag/nodes/RollbackNode.java | 5 +- .../dag/nodes/ScheduleCompactNode.java | 5 +- .../reader/DFSHoodieDatasetInputReader.java | 3 +- .../testsuite/job/TestHoodieTestSuiteJob.java | 8 +- .../DataSourceInternalWriterHelper.java | 2 +- .../apache/hudi/HoodieDataSourceHelpers.java | 2 +- .../scala/org/apache/hudi/DefaultSource.scala | 8 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 +- .../org/apache/hudi/HoodieStreamingSink.scala | 4 +- .../hudi/streaming/HoodieStreamSource.scala | 2 +- .../src/test/java/HoodieJavaStreamingApp.java | 4 +- .../hudi/functional/TestCOWDataSource.scala | 12 ++- .../functional/TestStructuredStreaming.scala | 9 +- .../sync/common/AbstractSyncHoodieClient.java | 2 +- .../hudi/utilities/HiveIncrementalPuller.java | 4 +- .../hudi/utilities/HoodieClusteringJob.java | 2 +- .../utilities/HoodieCompactionAdminTool.java | 2 +- .../hudi/utilities/HoodieSnapshotCopier.java | 2 +- .../utilities/HoodieSnapshotExporter.java | 4 +- ...ointFromAnotherHoodieTimelineProvider.java | 2 +- .../utilities/deltastreamer/DeltaSync.java | 3 +- .../deltastreamer/HoodieDeltaStreamer.java | 4 +- .../utilities/perf/TimelineServerPerf.java | 2 +- .../sources/helpers/IncrSourceHelper.java | 2 +- .../functional/TestHoodieDeltaStreamer.java | 20 ++--- 64 files changed, 241 insertions(+), 203 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java index a4059e16b..04dedc5df 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java @@ -85,8 +85,8 @@ public class HoodieCLI { } public static void refreshTableMetadata() { - setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath, false, HoodieCLI.consistencyGuardConfig, - Option.of(layoutVersion))); + setTableMetaClient(HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(basePath).setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig) + .setLayoutVersion(Option.of(layoutVersion)).build()); } public static void connectTo(String basePath, Integer layoutVersion) { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 70e202953..3e216b4cb 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -401,7 +401,7 @@ public class CommitsCommand implements CommandMarker { public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) { HoodieTableMetaClient source = HoodieCLI.getTableMetaClient(); - HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path); + HoodieTableMetaClient target = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build(); HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String targetLatestCommit = @@ -426,7 +426,7 @@ public class CommitsCommand implements CommandMarker { @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie table") public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) { - HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path); + HoodieCLI.syncTableMetadata = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build(); HoodieCLI.state = HoodieCLI.CLIState.SYNC; return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index ef76ee4e2..37bc6517f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -237,7 +237,7 @@ public class FileSystemViewCommand implements CommandMarker { boolean includeMaxInstant, boolean includeInflight, boolean excludeCompaction) throws IOException { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true); + HoodieTableMetaClient.builder().setConf(client.getHadoopConf()).setBasePath(client.getBasePath()).setLoadActiveTimelineOnLoad(true).build(); FileSystem fs = HoodieCLI.fs; String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex); List statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index f715b16e0..0f0a85146 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -402,8 +402,10 @@ public class SparkMain { */ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { HoodieWriteConfig config = getWriteConfig(basePath); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), false, - config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()) + .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); try { new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null); LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index 9c947e4d4..168de26e6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -95,7 +95,7 @@ public class TableCommand implements CommandMarker { boolean existing = false; try { - new HoodieTableMetaClient(HoodieCLI.conf, path); + HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build(); existing = true; } catch (TableNotFoundException dfe) { // expected diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala index 96944c5c0..04c5f93e7 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala @@ -75,7 +75,7 @@ class DedupeSparkJob(basePath: String, val tmpTableName = s"htbl_${System.currentTimeMillis()}" val dedupeTblName = s"${tmpTableName}_dupeKeys" - val metadata = new HoodieTableMetaClient(fs.getConf, basePath) + val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) @@ -184,7 +184,7 @@ class DedupeSparkJob(basePath: String, } def fixDuplicates(dryRun: Boolean = true) = { - val metadata = new HoodieTableMetaClient(fs.getConf, basePath) + val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index 765965fa5..0266a6539 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -128,9 +128,9 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl } protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { - return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad, - config.getConsistencyGuardConfig(), - Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))); + return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()) + .setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); } public Option getTimelineServer() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 1f76a5e30..be1fc3a72 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -106,7 +106,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table"); initRegistry(); - HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath()); + HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build(); initialize(engineContext, datasetMetaClient); if (enabled) { // This is always called even in case the table was created for the first time. This is because diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 6b7c4a69a..79a310639 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -43,13 +43,10 @@ public abstract class HoodieFlinkTable } public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient( - context.getHadoopConf().get(), - config.getBasePath(), - true, - config.getConsistencyGuardConfig(), - Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) - ); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) + .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); return HoodieFlinkTable.create(config, context, metaClient); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index bd8f9547c..219dec4e2 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -42,13 +42,10 @@ public abstract class HoodieJavaTable } public static HoodieJavaTable create(HoodieWriteConfig config, HoodieEngineContext context) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient( - context.getHadoopConf().get(), - config.getBasePath(), - true, - config.getConsistencyGuardConfig(), - Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) - ); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) + .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 4fb9f221c..f8cc757eb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -97,7 +97,7 @@ public class HoodieReadClient implements Serializ this.hadoopConf = context.getHadoopConf().get(); final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient); this.index = SparkHoodieIndex.createIndex(clientConfig); this.sqlContextOpt = Option.empty(); @@ -199,7 +199,7 @@ public class HoodieReadClient implements Serializ */ public List> getPendingCompactions() { HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(hadoopConf, hoodieTable.getMetaClient().getBasePath(), true); + HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build(); return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream() .map( instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue())) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index dd8106f52..70a57b79e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -42,13 +42,10 @@ public abstract class HoodieSparkTable } public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient( - context.getHadoopConf().get(), - config.getBasePath(), - true, - config.getConsistencyGuardConfig(), - Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) - ); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) + .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index e59a95009..67d82578f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -135,7 +135,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { int expNumRepairs) throws Exception { List> renameFiles = validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); List result = client.validateCompactionPlan(metaClient, compactionInstant, 1); if (expNumRepairs > 0) { assertTrue(result.stream().anyMatch(r -> !r.isSuccess()), "Expect some failures in validation"); @@ -176,7 +176,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { * @param compactionInstant Compaction Instant */ private void ensureValidCompactionPlan(String compactionInstant) throws Exception { - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); // Ensure compaction-plan is good to begin with List validationResults = client.validateCompactionPlan(metaClient, compactionInstant, 1); assertFalse(validationResults.stream().anyMatch(v -> !v.isSuccess()), @@ -234,7 +234,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { // Check suggested rename operations List> renameFiles = client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); // Log files belonging to file-slices created because of compaction request must be renamed @@ -270,7 +270,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { client.unscheduleCompactionPlan(compactionInstant, false, 1, false); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files @@ -306,7 +306,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { // Check suggested rename operations List> renameFiles = client .getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, Option.empty(), false); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); // Log files belonging to file-slices created because of compaction request must be renamed @@ -331,7 +331,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { // Call the main unschedule API client.unscheduleCompactionFileId(op.getFileGroupId(), false, false); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index b4a392e64..c81aa114a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -410,7 +410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { final HoodieWriteConfig cfg = hoodieWriteConfig; final String instantTime = "007"; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); String basePathStr = basePath; HoodieTable table = getHoodieTable(metaClient, cfg); jsc.parallelize(Arrays.asList(1)).map(e -> { @@ -894,7 +894,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTable table = getHoodieTable(metadata, config); BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView(); @@ -1001,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), "file should contain 340 records"); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); @@ -1428,7 +1428,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient); String instantTime = "000"; @@ -1533,7 +1533,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); String instantTime = "000"; HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build(); @@ -1559,7 +1559,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception { String instantTime = "000"; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true) .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() : diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 34daed76f..b0bd54f1f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -94,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness { // Read from hdfs FileSystem fs = FSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultHadoopConf()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), dfsBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); Dataset readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime); assertEquals(readRecords.count(), records.size(), "Should contain 100 records"); @@ -112,7 +112,7 @@ public class TestMultiFS extends HoodieClientTestHarness { LOG.info("Reading from path: " + tablePath); fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf()); - metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath); + metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); Dataset localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index c238fc04b..f822e8504 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -118,13 +118,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table should not exist until created for the first time assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath)); + assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); // Metadata table is not created if disabled by config try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { client.startCommitWithTime("001"); assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); - assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath)); + assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); } // Metadata table created when enabled by config & sync is called @@ -565,8 +565,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } } - HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); - HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, config.getBasePath()); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline(); // check that there are compactions. assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0); @@ -869,7 +869,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table should be in sync with the dataset assertTrue(metadata(client).isInSync()); - HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 0f6a15074..b8bccbc36 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -88,7 +88,7 @@ public class CompactionTestBase extends HoodieClientTestBase { **/ protected void validateDeltaCommit(String latestDeltaCommit, final Map> fgIdToCompactionOperation, HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable table = getHoodieTable(metaClient, cfg); List fileSliceList = getCurrentLatestFileSlices(table); fileSliceList.forEach(fileSlice -> { @@ -109,7 +109,7 @@ public class CompactionTestBase extends HoodieClientTestBase { List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); List> pendingCompactions = readClient.getPendingCompactions(); List gotPendingCompactionInstants = pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); @@ -131,7 +131,7 @@ public class CompactionTestBase extends HoodieClientTestBase { client.commit(firstInstant, statuses); } assertNoWriteErrors(statusList); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); List dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable); assertTrue(dataFilesToRead.stream().findAny().isPresent(), @@ -142,7 +142,7 @@ public class CompactionTestBase extends HoodieClientTestBase { int numRecords = records.size(); for (String instantTime : deltaInstants) { records = dataGen.generateUpdates(instantTime, numRecords); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); } @@ -150,7 +150,7 @@ public class CompactionTestBase extends HoodieClientTestBase { } protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() @@ -160,7 +160,7 @@ public class CompactionTestBase extends HoodieClientTestBase { protected void scheduleCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieWriteConfig cfg) { client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); } @@ -192,7 +192,7 @@ public class CompactionTestBase extends HoodieClientTestBase { } // verify that there is a commit - table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); + table = getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(), cfg); HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); assertEquals(latestCompactionCommitTime, compactionInstantTime, @@ -214,7 +214,7 @@ public class CompactionTestBase extends HoodieClientTestBase { "Compacted files should not show up in latest slices"); // verify that there is a commit - table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); + table = getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(), cfg); HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); // verify compaction commit is visible in timeline assertTrue(timeline.filterCompletedInstants().getInstants() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 08f9283dd..6e8326c6a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -75,7 +75,7 @@ public class TestAsyncCompaction extends CompactionTestBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); @@ -86,12 +86,12 @@ public class TestAsyncCompaction extends CompactionTestBase { moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Reload and rollback inflight compaction - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() .getInstants().findFirst().get(); assertEquals("compaction", pendingCompactionInstant.getAction()); @@ -129,10 +129,10 @@ public class TestAsyncCompaction extends CompactionTestBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), @@ -145,7 +145,7 @@ public class TestAsyncCompaction extends CompactionTestBase { client.startCommitWithTime(nextInflightInstantTime); // Validate - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, "inflight instant has expected instant time"); assertEquals(1, metaClient.getActiveTimeline() @@ -177,7 +177,7 @@ public class TestAsyncCompaction extends CompactionTestBase { new ArrayList<>()); // Schedule and mark compaction instant as inflight - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); @@ -210,7 +210,7 @@ public class TestAsyncCompaction extends CompactionTestBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); @@ -239,10 +239,10 @@ public class TestAsyncCompaction extends CompactionTestBase { records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), "inflight instant has expected instant time"); @@ -304,7 +304,7 @@ public class TestAsyncCompaction extends CompactionTestBase { runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false); } @@ -328,7 +328,7 @@ public class TestAsyncCompaction extends CompactionTestBase { records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); @@ -356,7 +356,7 @@ public class TestAsyncCompaction extends CompactionTestBase { runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); metaClient.reloadActiveTimeline(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 80542edfa..97d287592 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -57,7 +57,7 @@ public class TestInlineCompaction extends CompactionTestBase { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); // Then: ensure no compaction is executedm since there are only 2 delta commits assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); @@ -76,12 +76,12 @@ public class TestInlineCompaction extends CompactionTestBase { runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // third commit, that will trigger compaction - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); String finalInstant = HoodieActiveTimeline.createNewInstantTime(); createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false); // Then: ensure the file slices are compacted as per policy - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); } @@ -100,11 +100,11 @@ public class TestInlineCompaction extends CompactionTestBase { // after 10s, that will trigger compaction String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false); // Then: ensure the file slices are compacted as per policy - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); } @@ -121,17 +121,17 @@ public class TestInlineCompaction extends CompactionTestBase { runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); // Then: trigger the compaction because reach 3 commits. String finalInstant = HoodieActiveTimeline.createNewInstantTime(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); // 4th commit, that will trigger compaction because reach the time elapsed - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); finalInstant = HoodieActiveTimeline.createNewInstantTime(20000); createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(6, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); } } @@ -145,16 +145,16 @@ public class TestInlineCompaction extends CompactionTestBase { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); // Then: ensure no compaction is executedm since there are only 3 delta commits assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); // 4th commit, that will trigger compaction - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000); createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(5, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); } } @@ -183,12 +183,12 @@ public class TestInlineCompaction extends CompactionTestBase { HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS); String instantTime3 = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); createNextDeltaCommit(instantTime3, dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, false); } // Then: 1 delta commit is done, the failed compaction is retried - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @@ -218,13 +218,13 @@ public class TestInlineCompaction extends CompactionTestBase { HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED); String instantTime2; try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); instantTime2 = HoodieActiveTimeline.createNewInstantTime(); createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); } // Then: 1 delta commit is done, the failed compaction is retried - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @@ -255,13 +255,13 @@ public class TestInlineCompaction extends CompactionTestBase { HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME); String instantTime2; try (SparkRDDWriteClient writeClient = getHoodieWriteClient(inlineCfg)) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); instantTime2 = HoodieActiveTimeline.createNewInstantTime(); createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false); } // Then: 1 delta commit is done, the failed compaction is retried - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants()); assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 1caf9c04f..1104631a3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -220,7 +220,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { return (commit, numRecords) -> { final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig); List records = recordGenFunction.apply(commit, numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); JavaRDD taggedRecords = index.tagLocation(jsc.parallelize(records, 1), context, table); return taggedRecords.collect(); @@ -241,7 +241,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { return (numRecords) -> { final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig); List records = keyGenFunction.apply(numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); JavaRDD recordsToDelete = jsc.parallelize(records, 1) .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); @@ -438,7 +438,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { assertPartitionMetadataForRecords(records, fs); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); if (assertForCommit) { @@ -506,7 +506,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { assertPartitionMetadataForKeys(keysToDelete, fs); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); if (assertForCommit) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index e6523af46..f3febaba0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -346,7 +346,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) { - metaClient = new HoodieTableMetaClient(conf, basePath); + metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build(); return metaClient; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index c91b51b88..55c5aa72f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -151,7 +151,7 @@ public class HoodieClientTestUtils { String... paths) { List latestFiles = new ArrayList<>(); try { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); for (String path : paths) { BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 563355112..5b37b3b2d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -66,7 +66,7 @@ public class HoodieMergeOnReadTestUtils { public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, String rawHiveColumnTypes, boolean projectCols, List projectedColumns) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build(); FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf); Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 32ed7d94f..983678db1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -94,26 +94,7 @@ public class HoodieTableMetaClient implements Serializable { private HoodieArchivedTimeline archivedTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); - public HoodieTableMetaClient(Configuration conf, String basePath) { - // Do not load any timeline by default - this(conf, basePath, false); - } - - public HoodieTableMetaClient(Configuration conf, String basePath, String payloadClassName) { - this(conf, basePath, false, ConsistencyGuardConfig.newBuilder().build(), Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), - payloadClassName); - } - - public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, - ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion) { - this(conf, basePath, loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, null); - } - - public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad) { - this(conf, basePath, loadActiveTimelineOnLoad, ConsistencyGuardConfig.newBuilder().build(), Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null); - } - - public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, + private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName) { LOG.info("Loading HoodieTableMetaClient from " + basePath); @@ -152,9 +133,8 @@ public class HoodieTableMetaClient implements Serializable { public HoodieTableMetaClient() {} public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { - return new HoodieTableMetaClient(oldMetaClient.hadoopConf.get(), oldMetaClient.basePath, - oldMetaClient.loadActiveTimelineOnLoad, oldMetaClient.consistencyGuardConfig, - Option.of(oldMetaClient.timelineLayoutVersion), null); + return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) + .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build(); } /** @@ -471,7 +451,7 @@ public class HoodieTableMetaClient implements Serializable { HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props); // We should not use fs.getConf as this might be different from the original configuration // used to create the fs in unit tests - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath); return metaClient; } @@ -645,4 +625,59 @@ public class HoodieTableMetaClient implements Serializable { public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { this.activeTimeline = activeTimeline; } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link HoodieTableMetaClient}. + */ + public static class Builder { + + private Configuration conf; + private String basePath; + private boolean loadActiveTimelineOnLoad = false; + private String payloadClassName = null; + private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); + private Option layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION); + + public Builder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder setBasePath(String basePath) { + this.basePath = basePath; + return this; + } + + public Builder setLoadActiveTimelineOnLoad(boolean loadActiveTimelineOnLoad) { + this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad; + return this; + } + + public Builder setPayloadClassName(String payloadClassName) { + this.payloadClassName = payloadClassName; + return this; + } + + public Builder setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { + this.consistencyGuardConfig = consistencyGuardConfig; + return this; + } + + public Builder setLayoutVersion(Option layoutVersion) { + this.layoutVersion = layoutVersion; + return this; + } + + public HoodieTableMetaClient build() { + ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient"); + ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient"); + return new HoodieTableMetaClient(conf, basePath, + loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName); + } + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 6fb0a0590..02874e6cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -109,7 +109,7 @@ public abstract class AbstractHoodieLogRecordScanner { String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; - this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); + this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); // load class from the payload fully qualified class name this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); this.totalLogFiles.addAndGet(logFilePaths.size()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index ffc4b8582..fe159df00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -64,7 +64,7 @@ public class LogReaderUtils { public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, Configuration config) throws IOException { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config, basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build(); List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) .collect(Collectors.toList()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 6f0e7d547..f614df560 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -94,7 +94,7 @@ public class FileSystemViewManager { */ public SyncableFileSystemView getFileSystemView(String basePath) { return globalViewMap.computeIfAbsent(basePath, (path) -> { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), path); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf.newCopy()).setBasePath(path).build(); return viewCreator.apply(metaClient, viewStorageConfig); }); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index c86b37e09..b4143f336 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -71,7 +71,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); this.datasetBasePath = datasetBasePath; - this.datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath); + this.datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(datasetBasePath).build(); this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index a34652c68..32856065a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -93,7 +93,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { if (enabled && this.metaClient == null) { this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); try { - this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath); + this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build(); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); latestFileSystemMetadataSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); } catch (TableNotFoundException e) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index d7e3bde8c..3ed111cc0 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -114,9 +114,10 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_0); HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9"); byte[] dummy = new byte[5]; - HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new HoodieTableMetaClient(metaClient.getHadoopConf(), - metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(), - Option.of(new TimelineLayoutVersion(VERSION_0)))); + HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline( + HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()) + .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build()); // Old Timeline writes both to aux and timeline folder oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy)); // Now use latest timeline version diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 146e0bb85..0bcebaf71 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -324,7 +324,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { instantsToFiles = testMultipleWriteSteps(view1, Collections.singletonList("11"), true, "11"); SyncableFileSystemView view2 = - getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); // Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11")); @@ -334,7 +334,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view2.sync(); SyncableFileSystemView view3 = - getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); view3.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size()); @@ -346,7 +346,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view1.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size()); SyncableFileSystemView view4 = - getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); view4.sync(); /* @@ -360,7 +360,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view1.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); SyncableFileSystemView view5 = - getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); view5.sync(); /* @@ -383,7 +383,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { view1.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); SyncableFileSystemView view6 = - getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); + getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build()); view6.sync(); /* diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java index 44e3da059..fb5f123e8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java @@ -110,7 +110,7 @@ public class CompactionTestUtils { } }); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).setLoadActiveTimelineOnLoad(true).build(); Map> pendingCompactionMap = CompactionUtils.getAllPendingCompactionOperations(metaClient); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index bca91f800..2cca148ae 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -224,7 +224,7 @@ public class FileCreateUtils { public static Map getBaseFileCountsForPaths(String basePath, FileSystem fs, String... paths) { Map toReturn = new HashMap<>(); try { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); for (String path : paths) { TableFileSystemView.BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new org.apache.hadoop.fs.Path(path))); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 25b2c8baf..973881696 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -83,7 +83,7 @@ public class HoodieCommonTestHarness { } protected void refreshFsView() throws IOException { - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); } protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 35ff4cba0..92c40c739 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -188,7 +188,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { // schedule similar plan again so that there will be duplicates plan1.getOperations().get(0).setDataFilePath("bla"); scheduleCompaction(metaClient, "005", plan1); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); assertThrows(IllegalStateException.class, () -> { CompactionUtils.getAllPendingCompactionOperations(metaClient); }); @@ -203,7 +203,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { scheduleCompaction(metaClient, "003", plan2); // schedule same plan again so that there will be duplicates. It should not fail as it is a full duplicate scheduleCompaction(metaClient, "005", plan1); - metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); + metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); Map> res = CompactionUtils.getAllPendingCompactionOperations(metaClient); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index da45fa638..d94018b88 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -171,7 +171,7 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial try { HoodieTableMetaClient metaClient = metaClientCache.get(baseDir.toString()); if (null == metaClient) { - metaClient = new HoodieTableMetaClient(fs.getConf(), baseDir.toString(), true); + metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build(); metaClientCache.put(baseDir.toString(), metaClient); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 050b91add..f378f4413 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -71,7 +71,7 @@ public abstract class AbstractRealtimeRecordReader { } private boolean usesCustomPayload() { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath()); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index 9f9813654..eac9f4d6e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -324,7 +324,7 @@ public class HoodieInputFormatUtils { } Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels); LOG.info("Reading hoodie metadata from path " + baseDir.toString()); - return new HoodieTableMetaClient(fs.getConf(), baseDir.toString()); + return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build(); } public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws IOException { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java index 7c9090d2b..80430661e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java @@ -45,8 +45,9 @@ public class CompactNode extends DagNode> { */ @Override public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(), - executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath) + .build(); Option lastInstant = metaClient.getActiveTimeline() .getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().lastInstant(); if (lastInstant.isPresent()) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java index 1824cb8d6..c8cb62846 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java @@ -49,8 +49,9 @@ public class RollbackNode extends DagNode> { log.info("Executing rollback node {}", this.getName()); // Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer // testing for now - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(), - executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath) + .build(); Option lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); if (lastInstant.isPresent()) { log.info("Rolling back last instant {}", lastInstant.get()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java index c54b25a3c..62bf9b09a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java @@ -41,8 +41,9 @@ public class ScheduleCompactNode extends DagNode> { // testing for now // Find the last commit and extra the extra metadata to be passed to the schedule compaction. This is // done to ensure the CHECKPOINT is correctly passed from commit to commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(), - executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath) + .build(); Option lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); if (lastInstant.isPresent()) { HoodieCommitMetadata metadata = org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 136aa2777..1acbc9071 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; @@ -80,7 +79,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath, String schemaStr) { this.jsc = jsc; this.schemaStr = schemaStr; - this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + this.metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); } protected List getPartitions(Option partitionsLimit) throws IOException { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java index b14ef1808..6232b1dfc 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java @@ -173,7 +173,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase { cfg.workloadDagGenerator = ComplexDagGenerator.class.getName(); HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc); hoodieTestSuiteJob.runTestSuite(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new Configuration(), cfg.targetBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build(); assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(), 2); } @@ -192,7 +192,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase { } HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc); hoodieTestSuiteJob.runTestSuite(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new Configuration(), cfg.targetBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build(); assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(), 1); } @@ -207,7 +207,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase { cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME; HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc); hoodieTestSuiteJob.runTestSuite(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new Configuration(), cfg.targetBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build(); //assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(), 5); } @@ -222,7 +222,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase { cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME; HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc); hoodieTestSuiteJob.runTestSuite(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new Configuration(), cfg.targetBasePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build(); //assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(), 7); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index b40d36bea..7b04a6558 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -63,7 +63,7 @@ public class DataSourceInternalWriterHelper { this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true); writeClient.setOperationType(operationType); writeClient.startCommitWithTime(instantTime); - this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath()); + this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java index 734e0c0ea..ce80b5232 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java @@ -70,7 +70,7 @@ public class HoodieDataSourceHelpers { */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { return metaClient.getActiveTimeline().getTimelineOfActions( CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index ef9bf8cff..3299b8f25 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -84,7 +84,7 @@ class DefaultSource extends RelationProvider val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) log.info("Obtained hudi table path: " + tablePath) - val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) + val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent log.info("Is bootstrapped table => " + isBootstrappedTable) @@ -104,7 +104,7 @@ class DefaultSource extends RelationProvider } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) { getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { - val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) + val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient) } else { @@ -202,8 +202,8 @@ class DefaultSource extends RelationProvider if (path.isEmpty || path.get == null) { throw new HoodieException(s"'path' must be specified.") } - val metaClient = new HoodieTableMetaClient( - sqlContext.sparkSession.sessionState.newHadoopConf(), path.get) + val metaClient = HoodieTableMetaClient.builder().setConf( + sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build() val schemaResolver = new TableSchemaResolver(metaClient) val sqlSchema = try { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index ad185cb17..f5ba6c867 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -500,7 +500,8 @@ private[hudi] object HoodieSparkSqlWriter { hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = { if (tableExists) { hoodieTableConfigOpt.getOrElse( - new HoodieTableMetaClient(sparkContext.hadoopConfiguration, tablePath).getTableConfig) + HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath) + .build().getTableConfig) } else { null } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 846212d7e..f9a799e6a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -175,8 +175,8 @@ class HoodieStreamingSink(sqlContext: SQLContext, })) // First time, scan .hoodie folder and get all pending compactions - val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, - client.getConfig.getBasePath) + val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(client.getConfig.getBasePath).build() val pendingInstants :java.util.List[HoodieInstant] = CompactionUtils.getPendingCompactionInstantTimes(metaClient) pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h)) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index c17598a58..b0e3163ca 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -59,7 +59,7 @@ class HoodieStreamSource( val fs = path.getFileSystem(hadoopConf) TablePathUtils.getTablePath(fs, path).get() } - private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString) + private lazy val metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tablePath.toString).build() private lazy val tableType = metaClient.getTableType @transient private var lastOffset: HoodieSourceOffset = _ diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 1df12a350..3b55434f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -187,7 +187,7 @@ public class HoodieJavaStreamingApp { executor.shutdownNow(); } - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(tablePath).build(); if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { // Ensure we have successfully completed one compaction commit ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1); @@ -249,7 +249,7 @@ public class HoodieJavaStreamingApp { if (timeline.countInstants() >= numCommits) { return; } - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); System.out.println("Instants :" + metaClient.getActiveTimeline().getInstants().collect(Collectors.toList())); } catch (TableNotFoundException te) { LOG.info("Got table not found exception. Retrying"); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 8fdb02dd8..856cc008d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -210,7 +210,8 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Append) .save(basePath) - val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build(); val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction) assertEquals(2, commits.size) @@ -235,7 +236,8 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) - val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction) assertEquals(2, commits.size) @@ -289,7 +291,8 @@ class TestCOWDataSource extends HoodieClientTestBase { val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size assertEquals(7, filterSecondPartitionCount) - val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => instant.asInstanceOf[HoodieInstant].getAction) assertEquals(3, commits.size) @@ -339,7 +342,8 @@ class TestCOWDataSource extends HoodieClientTestBase { val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size assertEquals(7, filterSecondPartitionCount) - val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => instant.asInstanceOf[HoodieInstant].getAction) assertEquals(2, commits.size) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 08e1c82dc..49dff4d0d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -177,7 +177,8 @@ class TestStructuredStreaming extends HoodieClientTestBase { numInstants = timeline.countInstants success = true } - val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true) + val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath) + .setLoadActiveTimelineOnLoad(true).build() } catch { case te: TableNotFoundException => log.info("Got table not found exception. Retrying") @@ -253,12 +254,14 @@ class TestStructuredStreaming extends HoodieClientTestBase { if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, destPath).getCompletedReplaceTimeline().countInstants() > 0) { assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // check have at least one file group - this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) + this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) + .setLoadActiveTimelineOnLoad(true).build() assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0) } else { assertEquals(currNumCommits, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) // check have more than one file group - this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true) + this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath) + .setLoadActiveTimelineOnLoad(true).build() assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1) } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 8d03252d4..8477ed6ce 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -53,7 +53,7 @@ public abstract class AbstractSyncHoodieClient { public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata, boolean verifyMetadataFileListing, FileSystem fs) { - this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); this.tableType = metaClient.getTableType(); this.basePath = basePath; this.assumeDatePartitioning = assumeDatePartitioning; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java index 7d356a554..a3570b1f7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java @@ -276,7 +276,7 @@ public class HiveIncrementalPuller { if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new Path(targetDataPath + "/.hoodie"))) { return "0"; } - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), targetDataPath); + HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(targetDataPath).build(); Option lastCommit = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); @@ -309,7 +309,7 @@ public class HiveIncrementalPuller { } private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) { - HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation); + HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(sourceTableLocation).build(); List commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 394771caf..44328d3d6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -137,7 +137,7 @@ public class HoodieClusteringJob { } private String getSchemaFromLatestInstant() throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { throw new HoodieException("Cannot run clustering without any completed commits"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java index d3e4dbab7..d7642c46f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java @@ -60,7 +60,7 @@ public class HoodieCompactionAdminTool { * Executes one of compaction admin operations. */ public void run(JavaSparkContext jsc) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).build(); try (CompactionAdminClient admin = new CompactionAdminClient(new HoodieSparkEngineContext(jsc), cfg.basePath)) { final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index ece9b8c93..72d1dbdea 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -83,7 +83,7 @@ public class HoodieSnapshotCopier implements Serializable { final boolean verifyMetadataFileListing) throws IOException { FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration()); final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration()); - final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir); + final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build(); final BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants()); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index b9f32cbdc..8e792a0a5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -148,7 +148,7 @@ public class HoodieSnapshotExporter { } private Option getLatestCommitTimestamp(FileSystem fs, Config cfg) { - final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath); + final HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build(); Option latestCommit = tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline() .filterCompletedInstants().lastInstant(); return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty(); @@ -259,7 +259,7 @@ public class HoodieSnapshotExporter { private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) { FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration()); - HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath); + HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build(); return new HoodieTableFileSystemView(tableMetadata, tableMetadata .getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java index 17058da7f..e2554c4d4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java @@ -44,7 +44,7 @@ public class InitialCheckpointFromAnotherHoodieTimelineProvider extends InitialC @Override public void init(Configuration config) throws HoodieException { super.init(config); - this.anotherDsHoodieMetaclient = new HoodieTableMetaClient(config, path.toString()); + this.anotherDsHoodieMetaclient = HoodieTableMetaClient.builder().setConf(config).setBasePath(path.toString()).build(); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 9eb5c9dca..4c494dc2e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -223,8 +223,7 @@ public class DeltaSync implements Serializable { */ public void refreshTimeline() throws IOException { if (fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, - cfg.payloadClassName); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build(); switch (meta.getTableType()) { case COPY_ON_WRITE: this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 10d945369..6e3a024d0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -528,7 +528,7 @@ public class HoodieDeltaStreamer implements Serializable { if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = - new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, false); + HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build(); tableType = meta.getTableType(); // This will guarantee there is no surprise with table type ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)), @@ -636,7 +636,7 @@ public class HoodieDeltaStreamer implements Serializable { asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient)); // Enqueue existing pending compactions first HoodieTableMetaClient meta = - new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); + HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingCompaction(hoodieInstant)); asyncCompactService.get().start((error) -> { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index d296f0e05..184d7c7e4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -100,7 +100,7 @@ public class TimelineServerPerf implements Serializable { this.hostAddr = cfg.serverHost; } - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(timelineServer.getConf(), cfg.basePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(timelineServer.getConf()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build(); SyncableFileSystemView fsView = new RemoteHoodieTableFileSystemView(this.hostAddr, cfg.serverPort, metaClient); String reportDir = cfg.reportDir; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 96dc648f7..efe3a86f4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -58,7 +58,7 @@ public class IncrSourceHelper { int numInstantsPerFetch, Option beginInstant, boolean readLatestOnMissingBeginInstant) { ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); - HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true); + HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTimeline activeCommitTimeline = srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index eeef8ed79..616d039ed 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -363,7 +363,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numCompactionCommits = (int) timeline.getInstants().count(); @@ -371,7 +371,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); @@ -380,7 +380,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) throws IOException { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieInstant lastInstant = timeline.lastInstant().get(); HoodieCommitMetadata commitMetadata = @@ -408,7 +408,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) { - HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); @@ -683,13 +683,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; LOG.info("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); return completeReplaceSize > 0; }); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); } @@ -739,13 +739,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } else { LOG.warn("Schedule clustering failed"); } - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); return completeReplaceSize > 0; }); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); assertEquals(1, metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length); } @@ -921,7 +921,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(1000, counts.get(1).getLong(1)); // Test with empty commits - HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); + HoodieTableMetaClient mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); cfg2.filterDupes = false; @@ -930,7 +930,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc); ds2.sync(); - mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true); + mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build(); HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp() ));