From 8919be6a5d8038db7265bfd7459d72fbd545f133 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Sun, 28 Jun 2020 02:04:50 -0700 Subject: [PATCH] [HUDI-855] Run Cleaner async with writing (#1577) - Cleaner can now run concurrently with write operation - Configs to turn on/off Co-authored-by: Vinoth Chandar --- .../hudi/cli/commands/TestCleansCommand.java | 28 +++-- .../hudi/cli/integ/ITTestCleansCommand.java | 106 ------------------ .../HoodieTestCommitMetadataGenerator.java | 20 +++- .../hudi/async/AbstractAsyncService.java | 22 ++-- .../hudi/client/AsyncCleanerService.java | 85 ++++++++++++++ .../apache/hudi/client/HoodieWriteClient.java | 52 ++++++--- .../hudi/config/HoodieCompactionConfig.java | 10 ++ .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../action/clean/CleanActionExecutor.java | 5 +- .../org/apache/hudi/table/TestCleaner.java | 50 ++++----- .../deltastreamer/HoodieDeltaStreamer.java | 5 +- 11 files changed, 208 insertions(+), 179 deletions(-) delete mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java rename hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java => hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java (88%) create mode 100644 hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java index 69aa5b37a..c14cf0b2a 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java @@ -26,7 +26,6 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -36,6 +35,8 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; @@ -43,11 +44,10 @@ import org.springframework.shell.core.CommandResult; import java.io.File; import java.io.IOException; import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -77,6 +77,10 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { Configuration conf = HoodieCLI.conf; metaClient = HoodieCLI.getTableMetaClient(); + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + // Create four commits for (int i = 100; i < 104; i++) { String timestamp = String.valueOf(i); @@ -86,7 +90,8 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { // Inflight Compaction HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); - HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2, + Option.empty(), Option.empty()); } metaClient = HoodieTableMetaClient.reload(metaClient); @@ -103,9 +108,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { assertNotNull(propsFilePath, "Not found properties file"); // First, run clean - Files.createFile(Paths.get(tablePath, - HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH, - HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>()); assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(), "Loaded 1 clean and the count should match"); @@ -125,7 +127,7 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { // EarliestCommandRetained should be 102, since hoodie.cleaner.commits.retained=2 // Total Time Taken need read from metadata - rows.add(new Comparable[] {clean.getTimestamp(), "102", "0", getLatestCleanTimeTakenInMillis().toString()}); + rows.add(new Comparable[] {clean.getTimestamp(), "102", "2", getLatestCleanTimeTakenInMillis().toString()}); String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); expected = removeNonWordAndStripSpace(expected); @@ -142,12 +144,6 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { assertNotNull(propsFilePath, "Not found properties file"); // First, run clean with two partition - Files.createFile(Paths.get(tablePath, - HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH, - HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Files.createFile(Paths.get(tablePath, - HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH, - HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); SparkMain.clean(jsc, HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>()); assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(), "Loaded 1 clean and the count should match"); @@ -165,9 +161,11 @@ public class TestCleansCommand extends AbstractShellIntegrationTest { // There should be two partition path List rows = new ArrayList<>(); rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH, + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"}); + rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_THIRD_PARTITION_PATH, HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"}); rows.add(new Comparable[] {HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "0", "0"}); + HoodieCleaningPolicy.KEEP_LATEST_COMMITS, "1", "0"}); String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); expected = removeNonWordAndStripSpace(expected); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java deleted file mode 100644 index 1f6f6c7a3..000000000 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCleansCommand.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.cli.integ; - -import org.apache.hudi.cli.HoodieCLI; -import org.apache.hudi.cli.commands.TableCommand; -import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; -import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; - -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.shell.core.CommandResult; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class ITTestCleansCommand extends AbstractShellIntegrationTest { - - private String tablePath; - private URL propsFilePath; - - @BeforeEach - public void init() throws IOException { - HoodieCLI.conf = jsc.hadoopConfiguration(); - - String tableName = "test_table"; - tablePath = basePath + File.separator + tableName; - propsFilePath = this.getClass().getClassLoader().getResource("clean.properties"); - - // Create table and connect - new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), - "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - - Configuration conf = HoodieCLI.conf; - - metaClient = HoodieCLI.getTableMetaClient(); - // Create four commits - for (int i = 100; i < 104; i++) { - String timestamp = String.valueOf(i); - // Requested Compaction - HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); - // Inflight Compaction - HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); - HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf); - } - } - - /** - * Test case for cleans run. - */ - @Test - public void testRunClean() throws IOException { - // First, there should none of clean instant. - assertEquals(0, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count()); - - // Check properties file exists. - assertNotNull(propsFilePath, "Not found properties file"); - - // Create partition metadata - Files.createFile(Paths.get(tablePath, - HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH, - HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Files.createFile(Paths.get(tablePath, - HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH, - HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - - CommandResult cr = getShell().executeCommand("cleans run --sparkMaster local --propsFilePath " + propsFilePath.toString()); - assertTrue(cr.isSuccess()); - - // After run clean, there should have 1 clean instant - assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(), - "Loaded 1 clean and the count should match"); - } -} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java index bdf623ec4..94904c576 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.testutils; +import java.util.UUID; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; @@ -67,6 +68,12 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, Option writes, Option updates) { + createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(), + UUID.randomUUID().toString(), writes, updates); + } + + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, + String fileId1, String fileId2, Option writes, Option updates) { Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), HoodieTimeline.makeRequestedCommitFileName(commitTime)) .forEach(f -> { @@ -77,7 +84,8 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { FileSystem fs = FSUtils.getFs(basePath, configuration); os = fs.create(commitFile, true); // Generate commitMetadata - HoodieCommitMetadata commitMetadata = generateCommitMetadata(basePath, commitTime, writes, updates); + HoodieCommitMetadata commitMetadata = + generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates); // Write empty commit metadata os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException ioe) { @@ -103,8 +111,14 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, Option writes, Option updates) throws IOException { - String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime); - String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime); + return generateCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(), UUID.randomUUID().toString(), + writes, updates); + } + + public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1, + String fileId2, Option writes, Option updates) throws IOException { + String file1P0C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1); + String file1P1C0 = HoodieTestUtils.createDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2); return generateCommitMetadata(new HashMap>() { { put(DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java similarity index 88% rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java rename to hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java index 8fe1a7134..7ac236d10 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java +++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.utilities.deltastreamer; +package org.apache.hudi.async; import org.apache.hudi.common.util.collection.Pair; @@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; /** - * Base Class for running delta-sync/compaction in separate thread and controlling their life-cycle. + * Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle. */ -public abstract class AbstractDeltaStreamerService implements Serializable { +public abstract class AbstractAsyncService implements Serializable { - private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class); + private static final Logger LOG = LogManager.getLogger(AbstractAsyncService.class); // Flag to track if the service is started. private boolean started; @@ -49,15 +49,15 @@ public abstract class AbstractDeltaStreamerService implements Serializable { // Future tracking delta-sync/compaction private transient CompletableFuture future; - AbstractDeltaStreamerService() { + protected AbstractAsyncService() { shutdownRequested = false; } - boolean isShutdownRequested() { + protected boolean isShutdownRequested() { return shutdownRequested; } - boolean isShutdown() { + protected boolean isShutdown() { return shutdown; } @@ -67,7 +67,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable { * @throws ExecutionException * @throws InterruptedException */ - void waitForShutdown() throws ExecutionException, InterruptedException { + public void waitForShutdown() throws ExecutionException, InterruptedException { try { future.get(); } catch (ExecutionException ex) { @@ -82,7 +82,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable { * * @param force Forcefully shutdown */ - void shutdown(boolean force) { + public void shutdown(boolean force) { if (!shutdownRequested || force) { shutdownRequested = true; if (executor != null) { @@ -145,7 +145,9 @@ public abstract class AbstractDeltaStreamerService implements Serializable { } finally { // Mark as shutdown shutdown = true; - onShutdownCallback.apply(error); + if (null != onShutdownCallback) { + onShutdownCallback.apply(error); + } } }); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java new file mode 100644 index 000000000..6367e7972 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/client/AsyncCleanerService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.async.AbstractAsyncService; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Clean service running concurrently with write operation. + */ +class AsyncCleanerService extends AbstractAsyncService { + + private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); + + private final HoodieWriteClient writeClient; + private final String cleanInstantTime; + private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); + + protected AsyncCleanerService(HoodieWriteClient writeClient, String cleanInstantTime) { + this.writeClient = writeClient; + this.cleanInstantTime = cleanInstantTime; + } + + @Override + protected Pair startService() { + return Pair.of(CompletableFuture.supplyAsync(() -> { + writeClient.clean(cleanInstantTime); + return true; + }), executor); + } + + public static AsyncCleanerService startAsyncCleaningIfEnabled(HoodieWriteClient writeClient, + String instantTime) { + AsyncCleanerService asyncCleanerService = null; + if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation"); + asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); + asyncCleanerService.start(null); + } else { + LOG.info("Auto cleaning is not enabled. Not running cleaner now"); + } + return asyncCleanerService; + } + + public static void waitForCompletion(AsyncCleanerService asyncCleanerService) { + if (asyncCleanerService != null) { + LOG.info("Waiting for async cleaner to finish"); + try { + asyncCleanerService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException("Error waiting for async cleaning to finish", e); + } + } + } + + public static void forceShutdown(AsyncCleanerService asyncCleanerService) { + if (asyncCleanerService != null) { + LOG.info("Shutting down async cleaner"); + asyncCleanerService.shutdown(true); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 8f562eac7..a6d9d0da9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -80,6 +80,7 @@ public class HoodieWriteClient extends AbstractHo private final boolean rollbackPending; private final transient HoodieMetrics metrics; private transient Timer.Context compactionTimer; + private transient AsyncCleanerService asyncCleanerService; /** * Create a write client, without cleaning up failed/inflight commits. @@ -95,28 +96,28 @@ public class HoodieWriteClient extends AbstractHo * Create a write client, with new hudi index. * * @param jsc Java Spark Context - * @param clientConfig instance of HoodieWriteConfig + * @param writeConfig instance of HoodieWriteConfig * @param rollbackPending whether need to cleanup pending commits */ - public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) { - this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig)); + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending) { + this(jsc, writeConfig, rollbackPending, HoodieIndex.createIndex(writeConfig)); } - public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) { - this(jsc, clientConfig, rollbackPending, index, Option.empty()); + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index) { + this(jsc, writeConfig, rollbackPending, index, Option.empty()); } /** * Create a write client, allows to specify all parameters. * * @param jsc Java Spark Context - * @param clientConfig instance of HoodieWriteConfig + * @param writeConfig instance of HoodieWriteConfig * @param rollbackPending whether need to cleanup pending commits * @param timelineService Timeline Service that runs as part of write client. */ - public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, + public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, boolean rollbackPending, HoodieIndex index, Option timelineService) { - super(jsc, index, clientConfig, timelineService); + super(jsc, index, writeConfig, timelineService); this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackPending = rollbackPending; } @@ -158,6 +159,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.upsert(jsc, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -178,6 +180,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT_PREPPED); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -196,6 +199,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.insert(jsc,instantTime, records); return postWrite(result, instantTime, table); } @@ -215,6 +219,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_PREPPED); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -254,6 +259,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -279,6 +285,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); HoodieWriteMetadata result = table.bulkInsertPrepped(jsc,instantTime, preppedRecords, bulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -338,18 +345,30 @@ public class HoodieWriteClient extends AbstractHo // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true)); archiveLog.archiveIfRequired(hadoopConf); - if (config.isAutoClean()) { - // Call clean to cleanup if there is anything to cleanup after the commit, - LOG.info("Auto cleaning is enabled. Running cleaner now"); - clean(instantTime); - } else { - LOG.info("Auto cleaning is not enabled. Not running cleaner now"); - } + autoCleanOnCommit(instantTime); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } } + /** + * Handle auto clean during commit. + * @param instantTime + */ + private void autoCleanOnCommit(String instantTime) { + if (config.isAutoClean()) { + // Call clean to cleanup if there is anything to cleanup after the commit, + if (config.isAsyncClean()) { + LOG.info("Cleaner has been spawned already. Waiting for it to finish"); + AsyncCleanerService.waitForCompletion(asyncCleanerService); + LOG.info("Cleaner has finished"); + } else { + LOG.info("Auto cleaning is enabled. Running cleaner now"); + clean(instantTime); + } + } + } + /** * Create a savepoint based on the latest commit action on the timeline. * @@ -477,7 +496,8 @@ public class HoodieWriteClient extends AbstractHo */ @Override public void close() { - // Stop timeline-server if running + AsyncCleanerService.forceShutdown(asyncCleanerService); + asyncCleanerService = null; super.close(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 4ec048533..1b993b1f4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -40,6 +40,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy"; public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic"; + public static final String ASYNC_CLEAN_PROP = "hoodie.clean.async"; + // Turn on inline compaction - after fw delta commits a inline compaction will be run public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline"; // Run a compaction every N delta commits @@ -101,6 +103,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false"; private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(); private static final String DEFAULT_AUTO_CLEAN = "true"; + private static final String DEFAULT_ASYNC_CLEAN = "false"; private static final String DEFAULT_INLINE_COMPACT = "false"; private static final String DEFAULT_INCREMENTAL_CLEANER = "true"; private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5"; @@ -143,6 +146,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { return this; } + public Builder withAsyncClean(Boolean asyncClean) { + props.setProperty(ASYNC_CLEAN_PROP, String.valueOf(asyncClean)); + return this; + } + public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { props.setProperty(CLEANER_INCREMENTAL_MODE, String.valueOf(incrementalCleaningMode)); return this; @@ -247,6 +255,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN); + setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN_PROP), ASYNC_CLEAN_PROP, + DEFAULT_ASYNC_CLEAN); setDefaultOnCondition(props, !props.containsKey(CLEANER_INCREMENTAL_MODE), CLEANER_INCREMENTAL_MODE, DEFAULT_INCREMENTAL_CLEANER); setDefaultOnCondition(props, !props.containsKey(INLINE_COMPACT_PROP), INLINE_COMPACT_PROP, diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 083d780e4..3b822f0a1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -296,6 +296,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP)); } + public boolean isAsyncClean() { + return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.ASYNC_CLEAN_PROP)); + } + public boolean incrementalCleanerModeEnabled() { return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE)); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index b3caa440a..57feebc51 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -185,8 +185,9 @@ public class CleanActionExecutor extends BaseActionExecutor Option requestClean(String startCleanTime) { final HoodieCleanerPlan cleanerPlan = requestClean(jsc); if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) - && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { - + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty() + && cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { + // Only create cleaner plan which does some work final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); // Save to both aux and timeline folder try { diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 82f911da8..541f84f70 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -51,6 +51,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieTestDataGenerator; @@ -83,6 +84,7 @@ import scala.Tuple3; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -128,15 +130,8 @@ public class TestCleaner extends HoodieClientTestBase { HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf); assertFalse(table.getCompletedCommitsTimeline().empty()); - if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) { - // We no longer write empty cleaner plans when there are not enough commits present - assertTrue(table.getCompletedCleanTimeline().empty()); - } else { - String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); - assertFalse(table.getCompletedCleanTimeline().empty()); - assertEquals(instantTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(), - "The clean instant should be the same as the commit instant"); - } + // We no longer write empty cleaner plans when there is nothing to be cleaned. + assertTrue(table.getCompletedCleanTimeline().empty()); HoodieIndex index = HoodieIndex.createIndex(cfg); List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect(); @@ -439,17 +434,32 @@ public class TestCleaner extends HoodieClientTestBase { if (simulateRetryFailure) { HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs); + HoodieCleanMetadata metadata = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant); + metadata.getPartitionMetadata().values().forEach(p -> { + String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath(); + p.getSuccessDeleteFiles().forEach(p2 -> { + try { + metaClient.getFs().create(new Path(dirPath, p2), true); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + }); metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); - HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant()); + HoodieCleanMetadata newCleanMetadata = writeClient.clean(getNextInstant()); + // No new clean metadata would be created. Only the previous one will be retried + assertNull(newCleanMetadata); + HoodieCleanMetadata cleanMetadata2 = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant); assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()); - assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); + assertEquals(cleanMetadata1.getTotalFilesDeleted(), cleanMetadata2.getTotalFilesDeleted()); assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet()); final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant); cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> { HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k); HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k); assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns()); - assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles()); + assertEquals(p1.getSuccessDeleteFiles(), p2.getSuccessDeleteFiles()); + assertEquals(p1.getFailedDeleteFiles(), p2.getFailedDeleteFiles()); assertEquals(p1.getPartitionPath(), p2.getPartitionPath()); assertEquals(k, p1.getPartitionPath()); }); @@ -487,12 +497,7 @@ public class TestCleaner extends HoodieClientTestBase { metaClient = HoodieTableMetaClient.reload(metaClient); List hoodieCleanStatsOne = runCleaner(config); - assertEquals(0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); - assertEquals(0, - getCleanStat(hoodieCleanStatsOne, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", @@ -548,9 +553,7 @@ public class TestCleaner extends HoodieClientTestBase { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update List hoodieCleanStatsFour = runCleaner(config); - assertEquals(0, - getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() - .size(), "Must not clean any files"); + assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file3P0C2)); } @@ -819,11 +822,8 @@ public class TestCleaner extends HoodieClientTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); - assertEquals(0, - getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) - .getSuccessDeleteFiles().size(), + assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a3d81fa38..ccd5c49e1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.async.AbstractAsyncService; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; @@ -326,7 +327,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Syncs data either in single-run or in continuous mode. */ - public static class DeltaSyncService extends AbstractDeltaStreamerService { + public static class DeltaSyncService extends AbstractAsyncService { private static final long serialVersionUID = 1L; /** @@ -532,7 +533,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time. */ - public static class AsyncCompactService extends AbstractDeltaStreamerService { + public static class AsyncCompactService extends AbstractAsyncService { private static final long serialVersionUID = 1L; private final int maxConcurrentCompaction;