From 0dee8edc9741ee99e1e2bf98efd9673003fcb1e7 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 21 Feb 2022 18:53:03 -0800 Subject: [PATCH] [HUDI-2925] Fix duplicate cleaning of same files when unfinished clean operations are present using a config. (#4212) Co-authored-by: sivabalan --- .../hudi/client/BaseHoodieWriteClient.java | 31 +++++---- .../hudi/config/HoodieCompactionConfig.java | 12 ++++ .../apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../hudi/table/action/BaseActionExecutor.java | 2 +- .../action/clean/CleanActionExecutor.java | 8 ++- .../org/apache/hudi/table/TestCleaner.java | 68 +++++++++++++++++++ .../apache/hudi/common/util/CleanerUtils.java | 8 +++ 7 files changed, 118 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index f3dc53b0f..7b67ff54a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -767,21 +767,28 @@ public abstract class BaseHoodieWriteClient rollbackFailedWrites(skipLocking)); - HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking); - if (timerContext != null && metadata != null) { - long durationMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); - LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" - + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() - + " cleanerElapsedMs" + durationMs); + + HoodieCleanMetadata metadata = null; + HoodieTable table = createTable(config, hadoopConf); + if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) { + LOG.info("Cleaner started"); + // proceed only if multiple clean schedules are enabled or if there are no pending cleans. + if (scheduleInline) { + scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); + table.getMetaClient().reloadActiveTimeline(); + } + + metadata = table.clean(context, cleanInstantTime, skipLocking); + if (timerContext != null && metadata != null) { + long durationMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" + + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + + " cleanerElapsedMs" + durationMs); + } } return metadata; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 130d379f2..0aac9308d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -278,6 +278,13 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("The average record size. If not explicitly specified, hudi will compute the " + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + + public static final ConfigProperty ALLOW_MULTIPLE_CLEANS = ConfigProperty + .key("hoodie.clean.allow.multiple") + .defaultValue(true) + .sinceVersion("0.11.0") + .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, " + + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config."); public static final ConfigProperty ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty .key("hoodie.archive.merge.files.batch.size") @@ -642,6 +649,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) { + compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules)); + return this; + } + public Builder withCleanerParallelism(int cleanerParallelism) { compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b41da7f21..c7f2c45ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1117,6 +1117,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE); } + public boolean allowMultipleCleans() { + return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS); + } + public boolean shouldAutoTuneInsertSplits() { return getBoolean(HoodieCompactionConfig.COPY_ON_WRITE_AUTO_SPLIT_INSERTS); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index 221f970cb..f893b4ccd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -65,7 +65,7 @@ public abstract class BaseActionExecutor w.update(metadata, instantTime)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 9813b2b65..4ae8009c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -208,7 +208,7 @@ public class CleanActionExecutor extends if (!skipLocking) { this.txnManager.beginTransaction(Option.empty(), Option.empty()); } - writeTableMetadata(metadata); + writeTableMetadata(metadata, inflightInstant.getTimestamp()); table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant, TimelineMetadataUtils.serializeCleanMetadata(metadata)); LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete"); @@ -240,9 +240,13 @@ public class CleanActionExecutor extends LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); } } + table.getMetaClient().reloadActiveTimeline(); + if (config.isMetadataTableEnabled()) { + table.getHoodieView().sync(); + } }); - table.getMetaClient().reloadActiveTimeline(); } + // return the last clean metadata for now // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services // This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index bec0b08ed..f51a169dd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -122,6 +122,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -254,6 +255,73 @@ public class TestCleaner extends HoodieClientTestBase { SparkRDDWriteClient::upsertPreppedRecords, true); } + + /** + * Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled. + */ + @Test + public void testMultiClean() { + HoodieWriteConfig writeConfig = getConfigBuilder() + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) + .allowMultipleCleans(false) + .withAutoClean(false).retainCommits(1).retainFileVersions(1).build()) + .withEmbeddedTimelineServerEnabled(false).build(); + + int index = 0; + String cleanInstantTime; + final String partition = "2015/03/16"; + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + // Three writes so we can initiate a clean + for (; index < 3; ++index) { + String newCommitTime = "00" + index; + List records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition); + client.startCommitWithTime(newCommitTime); + client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + } + } + + // mimic failed/leftover clean by scheduling a clean but not performing it + cleanInstantTime = "00" + index++; + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + Option cleanPlan = table.scheduleCleaning(context, cleanInstantTime, Option.empty()); + assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition).size(), 1); + assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().countInstants(), 1); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) { + // Next commit. This is required so that there is an additional file version to clean. + String newCommitTime = "00" + index++; + List records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition); + client.startCommitWithTime(newCommitTime); + client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Initiate another clean. The previous leftover clean will be attempted first, followed by another clean + // due to the commit above. + String newCleanInstantTime = "00" + index++; + HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime); + // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false + assertNull(cleanMetadata); + + // let the old clean complete + table = HoodieSparkTable.create(writeConfig, context); + cleanMetadata = table.clean(context, cleanInstantTime, false); + assertNotNull(cleanMetadata); + + // any new clean should go ahead + cleanMetadata = client.clean(newCleanInstantTime); + // subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false + assertNotNull(cleanMetadata); + + // 1 file cleaned + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); + } + } + /** * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 9f63bfa3d..a3a130566 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -35,6 +35,9 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1Mig import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -43,6 +46,9 @@ import java.util.Map; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; public class CleanerUtils { + + private static final Logger LOG = LogManager.getLogger(CleanerUtils.class); + public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION; public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION; public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2; @@ -131,6 +137,7 @@ public class CleanerUtils { // No need to do any special cleanup for failed operations during clean return; } else if (cleaningPolicy.isLazy()) { + LOG.info("Cleaned failed attempts if any"); // Perform rollback of failed operations for all types of actions during clean rollbackFailedWritesFunc.apply(); return; @@ -140,6 +147,7 @@ public class CleanerUtils { case COMMIT_ACTION: // For any other actions, perform rollback of failed writes if (cleaningPolicy.isEager()) { + LOG.info("Cleaned failed attempts if any"); rollbackFailedWritesFunc.apply(); return; }