From 27bd7b538e46524d6863e36e334b4a6da665ed32 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 14 Feb 2022 18:15:06 -0800 Subject: [PATCH] [HUDI-1576] Make archiving an async service (#4795) --- .../commands/TestArchivedCommitsCommand.java | 6 +- .../hudi/cli/commands/TestCommitsCommand.java | 10 +-- .../cli/commands/TestCompactionCommand.java | 6 +- .../hudi/async/AsyncArchiveService.java | 85 +++++++++++++++++++ .../AsyncCleanerService.java | 43 +++++----- .../apache/hudi/async/HoodieAsyncService.java | 12 +-- .../hudi/client/BaseHoodieWriteClient.java | 62 +++++++++----- .../HoodieTimelineArchiver.java} | 22 ++--- .../hudi/config/HoodieCompactionConfig.java | 36 +++++--- .../apache/hudi/config/HoodieWriteConfig.java | 16 ++-- .../hudi/async/TestAsyncArchiveService.java | 75 ++++++++++++++++ .../hudi/client/HoodieFlinkWriteClient.java | 6 +- .../client/TestHoodieClientMultiWriter.java | 3 +- .../functional/TestHoodieMetadataBase.java | 6 +- ...g.java => TestHoodieTimelineArchiver.java} | 66 +++++++------- 15 files changed, 327 insertions(+), 127 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/{client => async}/AsyncCleanerService.java (62%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/{table/HoodieTimelineArchiveLog.java => client/HoodieTimelineArchiver.java} (97%) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestAsyncArchiveService.java rename hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/{TestHoodieTimelineArchiveLog.java => TestHoodieTimelineArchiver.java} (95%) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 091c4b417..ee105aa6b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -33,7 +33,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -96,8 +96,8 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness { // archive HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - archiveLog.archiveIfRequired(context()); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + archiver.archiveIfRequired(context()); } /** diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 9e2046df8..aefd209fc 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -40,7 +40,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; @@ -232,8 +232,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { // archive metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - archiveLog.archiveIfRequired(context()); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + archiver.archiveIfRequired(context()); CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); assertTrue(cr.isSuccess()); @@ -279,8 +279,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); // need to create multi archive files - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - archiveLog.archiveIfRequired(context()); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + archiver.archiveIfRequired(context()); } CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "160", "174")); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java index 21841a576..c8bb94257 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java @@ -40,7 +40,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -167,8 +167,8 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness { // archive HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - archiveLog.archiveIfRequired(context()); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + archiver.archiveIfRequired(context()); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java new file mode 100644 index 000000000..980239f33 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Async archive service to run concurrently with write operation. + */ +public class AsyncArchiveService extends HoodieAsyncService { + + private static final Logger LOG = LogManager.getLogger(AsyncArchiveService.class); + + private final BaseHoodieWriteClient writeClient; + private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); + + protected AsyncArchiveService(BaseHoodieWriteClient writeClient) { + this.writeClient = writeClient; + } + + @Override + protected Pair startService() { + LOG.info("Starting async archive service..."); + return Pair.of(CompletableFuture.supplyAsync(() -> { + writeClient.archive(); + return true; + }, executor), executor); + } + + public static AsyncArchiveService startAsyncArchiveIfEnabled(BaseHoodieWriteClient writeClient) { + HoodieWriteConfig config = writeClient.getConfig(); + if (!config.isAutoArchive() || !config.isAsyncArchive()) { + LOG.info("The HoodieWriteClient is not configured to auto & async archive. Async archive service will not start."); + return null; + } + AsyncArchiveService asyncArchiveService = new AsyncArchiveService(writeClient); + asyncArchiveService.start(null); + return asyncArchiveService; + } + + public static void waitForCompletion(AsyncArchiveService asyncArchiveService) { + if (asyncArchiveService != null) { + LOG.info("Waiting for async archive service to finish"); + try { + asyncArchiveService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException("Error waiting for async archive service to finish", e); + } + } + } + + public static void forceShutdown(AsyncArchiveService asyncArchiveService) { + if (asyncArchiveService != null) { + LOG.info("Shutting down async archive service..."); + asyncArchiveService.shutdown(true); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java similarity index 62% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java index a287ff4a6..e316f5dca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java @@ -7,21 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.client; +package org.apache.hudi.async; -import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -30,9 +33,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** - * Clean service running concurrently with write operation. + * Async clean service to run concurrently with write operation. */ -class AsyncCleanerService extends HoodieAsyncService { +public class AsyncCleanerService extends HoodieAsyncService { private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); @@ -46,7 +49,7 @@ class AsyncCleanerService extends HoodieAsyncService { @Override protected Pair startService() { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); + LOG.info(String.format("Starting async clean service with instant time %s...", instantTime)); return Pair.of(CompletableFuture.supplyAsync(() -> { writeClient.clean(instantTime); return true; @@ -54,30 +57,30 @@ class AsyncCleanerService extends HoodieAsyncService { } public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) { - AsyncCleanerService asyncCleanerService = null; - if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - asyncCleanerService = new AsyncCleanerService(writeClient); - asyncCleanerService.start(null); - } else { - LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); + HoodieWriteConfig config = writeClient.getConfig(); + if (!config.isAutoClean() || !config.isAsyncClean()) { + LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start."); + return null; } + AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient); + asyncCleanerService.start(null); return asyncCleanerService; } public static void waitForCompletion(AsyncCleanerService asyncCleanerService) { if (asyncCleanerService != null) { - LOG.info("Waiting for async cleaner to finish"); + LOG.info("Waiting for async clean service to finish"); try { asyncCleanerService.waitForShutdown(); } catch (Exception e) { - throw new HoodieException("Error waiting for async cleaning to finish", e); + throw new HoodieException("Error waiting for async clean service to finish", e); } } } public static void forceShutdown(AsyncCleanerService asyncCleanerService) { if (asyncCleanerService != null) { - LOG.info("Shutting down async cleaner"); + LOG.info("Shutting down async clean service..."); asyncCleanerService.shutdown(true); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 9f41080d2..a85635a7f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -36,7 +36,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** - * Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle. + * Base Class for running archive/clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycles. */ public abstract class HoodieAsyncService implements Serializable { @@ -70,11 +70,15 @@ public abstract class HoodieAsyncService implements Serializable { this.runInDaemonMode = runInDaemonMode; } - protected boolean isShutdownRequested() { + public boolean isStarted() { + return started; + } + + public boolean isShutdownRequested() { return shutdownRequested; } - protected boolean isShutdown() { + public boolean isShutdown() { return shutdown; } @@ -138,8 +142,6 @@ public abstract class HoodieAsyncService implements Serializable { /** * Service implementation. - * - * @return */ protected abstract Pair startService(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 14e71f1b0..55ccb540f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import org.apache.hudi.async.AsyncArchiveService; +import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -67,7 +69,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.savepoint.SavepointHelpers; @@ -115,6 +116,7 @@ public abstract class BaseHoodieWriteClient>> lastCompletedTxnAndMetadata = Option.empty(); @@ -431,6 +433,11 @@ public abstract class BaseHoodieWriteClient table) { + if (!config.isAutoArchive()) { + return; + } + + if (config.isAsyncArchive()) { + LOG.info("Async archiver has been spawned. Waiting for it to finish"); + AsyncArchiveService.waitForCompletion(asyncArchiveService); + LOG.info("Async archiver has finished"); + } else { + LOG.info("Start to archive synchronously."); + archive(table); } } @@ -784,8 +801,8 @@ public abstract class BaseHoodieWriteClient table) { try { // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); + archiver.archiveIfRequired(context); } catch (IOException ioe) { throw new HoodieIOException("Failed to archive", ioe); } @@ -1249,7 +1266,8 @@ public abstract class BaseHoodieWriteClient { +public class HoodieTimelineArchiver { - private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiveLog.class); + private static final Logger LOG = LogManager.getLogger(HoodieTimelineArchiver.class); private final Path archiveFilePath; private final HoodieWriteConfig config; @@ -91,7 +93,7 @@ public class HoodieTimelineArchiveLog { private final HoodieTable table; private final HoodieTableMetaClient metaClient; - public HoodieTimelineArchiveLog(HoodieWriteConfig config, HoodieTable table) { + public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable table) { this.config = config; this.table = table; this.metaClient = table.getMetaClient(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index ee900fb36..0d0984354 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -50,13 +50,6 @@ import java.util.stream.Collectors; + "cleaning (reclamation of older/unused file groups/slices).") public class HoodieCompactionConfig extends HoodieConfig { - public static final ConfigProperty AUTO_CLEAN = ConfigProperty - .key("hoodie.clean.automatic") - .defaultValue("true") - .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit," - + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" - + " growth is bounded."); - public static final ConfigProperty AUTO_ARCHIVE = ConfigProperty .key("hoodie.archive.automatic") .defaultValue("true") @@ -64,6 +57,20 @@ public class HoodieCompactionConfig extends HoodieConfig { + " to archive commits if we cross a maximum value of commits." + " It's recommended to enable this, to ensure number of active commits is bounded."); + public static final ConfigProperty ASYNC_ARCHIVE = ConfigProperty + .key("hoodie.archive.async") + .defaultValue("false") + .sinceVersion("0.11.0") + .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. " + + "When turned on runs archiver async with writing, which can speed up overall write performance."); + + public static final ConfigProperty AUTO_CLEAN = ConfigProperty + .key("hoodie.clean.automatic") + .defaultValue("true") + .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit," + + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" + + " growth is bounded."); + public static final ConfigProperty ASYNC_CLEAN = ConfigProperty .key("hoodie.clean.async") .defaultValue("false") @@ -522,6 +529,16 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withAutoArchive(Boolean autoArchive) { + compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive)); + return this; + } + + public Builder withAsyncArchive(Boolean asyncArchive) { + compactionConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive)); + return this; + } + public Builder withAutoClean(Boolean autoClean) { compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean)); return this; @@ -532,11 +549,6 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } - public Builder withAutoArchive(Boolean autoArchive) { - compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive)); - return this; - } - public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1cdad6c56..dbc62494e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1112,10 +1112,6 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE); } - public boolean isAutoClean() { - return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); - } - public boolean getArchiveMergeEnable() { return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE); } @@ -1128,6 +1124,14 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); } + public boolean isAsyncArchive() { + return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE); + } + + public boolean isAutoClean() { + return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); + } + public boolean isAsyncClean() { return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); } @@ -1872,7 +1876,7 @@ public class HoodieWriteConfig extends HoodieConfig { * @return True if any table services are configured to run inline, false otherwise. */ public Boolean areAnyTableServicesExecutedInline() { - return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); + return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean() || isAutoArchive(); } /** @@ -1881,7 +1885,7 @@ public class HoodieWriteConfig extends HoodieConfig { * @return True if any table services are configured to run async, false otherwise. */ public Boolean areAnyTableServicesAsync() { - return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean(); + return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive(); } public Boolean areAnyTableServicesScheduledInline() { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestAsyncArchiveService.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestAsyncArchiveService.java new file mode 100644 index 000000000..9dad8b802 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestAsyncArchiveService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TestAsyncArchiveService { + + @Mock + BaseHoodieWriteClient writeClient; + @Mock + HoodieWriteConfig config; + + @Test + void startAsyncArchiveReturnsNullWhenAutoArchiveDisabled() { + when(config.isAutoArchive()).thenReturn(false); + when(writeClient.getConfig()).thenReturn(config); + assertNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient)); + } + + @Test + void startAsyncArchiveReturnsNullWhenAsyncArchiveDisabled() { + when(config.isAutoArchive()).thenReturn(true); + when(config.isAsyncArchive()).thenReturn(false); + when(writeClient.getConfig()).thenReturn(config); + assertNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient)); + } + + @Test + void startAsyncArchiveIfEnabled() { + when(config.isAutoArchive()).thenReturn(true); + when(config.isAsyncArchive()).thenReturn(true); + when(writeClient.getConfig()).thenReturn(config); + assertNotNull(AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient)); + } + + @Test + void startServiceShouldInvokeCallArchiveMethod() throws ExecutionException, InterruptedException { + AsyncArchiveService service = new AsyncArchiveService(writeClient); + assertEquals(true, service.startService().getLeft().get()); + verify(writeClient).archive(); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index c3d977e88..1f5d14af7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -332,10 +333,7 @@ public class HoodieFlinkWriteClient extends // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (config.isAutoArchive()) { - // We cannot have unbounded commit files. Archive commits if we have to archive - archive(table); - } + autoArchiveOnCommit(table); } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index c3cde7416..f24c4279b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -102,7 +102,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); HoodieWriteConfig writeConfig = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoArchive(false).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 97572de25..3141e1051 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -52,7 +52,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -286,8 +286,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { protected void archiveDataTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws IOException { HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); - archiveLog.archiveIfRequired(context); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); + archiver.archiveIfRequired(context); } protected void validateMetadata(HoodieTestTable testTable) throws IOException { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java similarity index 95% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 099deaadf..b2f0ef4ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -47,7 +47,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.conf.Configuration; @@ -77,9 +77,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { +public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieTimelineArchiveLog.class); + private static final Logger LOG = LogManager.getLogger(TestHoodieTimelineArchiver.class); private Configuration hadoopConf; private HoodieWrapperFileSystem wrapperFs; @@ -172,8 +172,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { .withParallelism(2, 2).forTable("test-trip-table").build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - boolean result = archiveLog.archiveIfRequired(context); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + boolean result = archiver.archiveIfRequired(context); assertTrue(result); } @@ -224,14 +224,14 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { // build a merge small archive plan with dummy content // this plan can not be deserialized. HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - archiveLog.reOpenWriter(); + archiver.reOpenWriter(); Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME); - archiveLog.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1"); + archiver.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1"); String s = "Dummy Content"; // stain the current merge plan file. FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes())); @@ -274,15 +274,15 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { // do a single merge small archive files HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - archiveLog.reOpenWriter(); + archiver.reOpenWriter(); - archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); - archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); - HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); + archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + HoodieLogFormat.Writer writer = archiver.reOpenWriter(); // check loading archived and active timeline success HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); @@ -327,16 +327,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { // do a single merge small archive files HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - archiveLog.reOpenWriter(); + archiver.reOpenWriter(); - archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); - archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); - archiveLog.reOpenWriter(); + archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + archiver.reOpenWriter(); // delete only one of the small archive file to simulate delete action failed. metaClient.getFs().delete(fsStatuses[0].getPath()); @@ -397,16 +397,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { } HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); List candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); - archiveLog.reOpenWriter(); + archiver.reOpenWriter(); - archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); - archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); - HoodieLogFormat.Writer writer = archiveLog.reOpenWriter(); + archiver.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1"); + archiver.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + HoodieLogFormat.Writer writer = archiver.reOpenWriter(); String s = "Dummy Content"; // stain the current merged archive file. @@ -470,11 +470,11 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf()); HoodieTable table = HoodieSparkTable.create(cfg, context); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - assertTrue(archiveLog.archiveIfRequired(context)); + assertTrue(archiver.archiveIfRequired(context)); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(5, timeline.countInstants(), "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); @@ -620,8 +620,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf()); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); - boolean result = archiveLog.archiveIfRequired(context); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + boolean result = archiver.archiveIfRequired(context); assertTrue(result); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); List archivedInstants = Arrays.asList(instant1, instant2, instant3); @@ -776,9 +776,9 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { } HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - archiveLog.archiveIfRequired(context); + archiver.archiveIfRequired(context); Stream currentInstants = metaClient.getActiveTimeline().reload().getInstants(); Map> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction)); @@ -810,9 +810,9 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - archiveLog.archiveIfRequired(context); + archiver.archiveIfRequired(context); List notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants().collect(Collectors.toList()); assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3"); @@ -894,8 +894,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); List originalCommits = timeline.getInstants().collect(Collectors.toList()); HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); - archiveLog.archiveIfRequired(context); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); + archiver.archiveIfRequired(context); timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); List commitsAfterArchival = timeline.getInstants().collect(Collectors.toList()); return Pair.of(originalCommits, commitsAfterArchival);