From 91d2e61433e74abb44cb4d0ae236ee8f4a94e1f8 Mon Sep 17 00:00:00 2001 From: rmahindra123 <76502047+rmahindra123@users.noreply.github.com> Date: Thu, 2 Dec 2021 10:32:26 -0800 Subject: [PATCH] [HUDI-2904] Fix metadata table archival overstepping between regular writers and table services (#4186) - Co-authored-by: Rajesh Mahindra - Co-authored-by: Sivabalan Narayanan --- .../client/AbstractHoodieWriteClient.java | 33 +++++++++++++--- .../hudi/config/HoodieCompactionConfig.java | 12 ++++++ .../apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../HoodieBackedTableMetadataWriter.java | 4 +- .../hudi/client/HoodieFlinkWriteClient.java | 11 ++---- .../FlinkHoodieBackedTableMetadataWriter.java | 1 + .../SparkHoodieBackedTableMetadataWriter.java | 1 + .../functional/TestHoodieBackedMetadata.java | 38 +++++++++++++++++++ 8 files changed, 91 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 96d89fcc3..59acbb217 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -449,11 +449,9 @@ public abstract class AbstractHoodieWriteClient table) { + try { + // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); + archiveLog.archiveIfRequired(context); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to archive", ioe); + } + } + + /** + * Trigger archival for the table. This ensures that the number of commits do not explode + * and keep increasing unbounded over time. + */ + public void archive() { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = createTable(config, hadoopConf); + archive(table); + } + /** * Provides a new commit time for a write operation (insert/update/delete). */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index fbe31b029..640f0cb18 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig { + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" + " growth is bounded."); + public static final ConfigProperty AUTO_ARCHIVE = ConfigProperty + .key("hoodie.archive.automatic") + .defaultValue("true") + .withDocumentation("When enabled, the archival table service is invoked immediately after each commit," + + " to archive commits if we cross a maximum value of commits." + + " It's recommended to enable this, to ensure number of active commits is bounded."); + public static final ConfigProperty ASYNC_CLEAN = ConfigProperty .key("hoodie.clean.async") .defaultValue("false") @@ -493,6 +500,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withAutoArchive(Boolean autoArchive) { + compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive)); + return this; + } + public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b49108f76..df4b3f6c3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1101,6 +1101,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); } + public boolean isAutoArchive() { + return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); + } + public boolean isAsyncClean() { return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f9486b1bc..54284fc48 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -204,7 +204,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) // we will trigger compaction manually, to control the instant times .withInlineCompaction(false) - .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()) + // we will trigger archive manually, to ensure only regular writer invokes it + .withAutoArchive(false).build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 36caa1b0e..374dd1226 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -40,7 +40,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -57,7 +56,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -332,11 +330,10 @@ public class HoodieFlinkWriteClient extends // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + if (config.isAutoArchive()) { + // We cannot have unbounded commit files. Archive commits if we have to archive + archive(table); + } } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 5e782c55a..0dcfcfc92 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -140,6 +140,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index ff8f556ea..65ade8287 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -155,6 +155,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 82bc8927e..73b78118b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -107,6 +107,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -250,6 +251,43 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable, emptyList(), true); } + @Test + public void testMetadataTableArchival() throws Exception { + init(COPY_ON_WRITE, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .archiveCommitsWith(3, 4) + .retainCommits(1) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + AtomicInteger commitTime = new AtomicInteger(1); + // trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered. + int i = 1; + for (; i <= 2; i++) { + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + } + // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction. + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + + // trigger a async table service, archival should not kick in, even though conditions are met. + doCluster(testTable, "000000" + commitTime.getAndIncrement()); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5); + + // trigger a regular write operation. archival should kick in. + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); + } + @ParameterizedTest @EnumSource(HoodieTableType.class) public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exception {