[HUDI-2925] Fix duplicate cleaning of same files when unfinished clean operations are present using a config. (#4212)
Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
@@ -767,21 +767,28 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
if (!tableServicesEnabled(config)) {
|
if (!tableServicesEnabled(config)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (scheduleInline) {
|
|
||||||
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
|
|
||||||
}
|
|
||||||
LOG.info("Cleaner started");
|
|
||||||
final Timer.Context timerContext = metrics.getCleanCtx();
|
final Timer.Context timerContext = metrics.getCleanCtx();
|
||||||
LOG.info("Cleaned failed attempts if any");
|
|
||||||
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
|
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
|
||||||
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
|
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
|
||||||
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking);
|
|
||||||
if (timerContext != null && metadata != null) {
|
HoodieCleanMetadata metadata = null;
|
||||||
long durationMs = metrics.getDurationInMs(timerContext.stop());
|
HoodieTable table = createTable(config, hadoopConf);
|
||||||
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
|
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
|
||||||
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
|
LOG.info("Cleaner started");
|
||||||
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
|
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
|
||||||
+ " cleanerElapsedMs" + durationMs);
|
if (scheduleInline) {
|
||||||
|
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata = table.clean(context, cleanInstantTime, skipLocking);
|
||||||
|
if (timerContext != null && metadata != null) {
|
||||||
|
long durationMs = metrics.getDurationInMs(timerContext.stop());
|
||||||
|
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
|
||||||
|
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
|
||||||
|
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
|
||||||
|
+ " cleanerElapsedMs" + durationMs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return metadata;
|
return metadata;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -279,6 +279,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
|||||||
+ "record size estimate compute dynamically based on commit metadata. "
|
+ "record size estimate compute dynamically based on commit metadata. "
|
||||||
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
|
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
|
||||||
|
.key("hoodie.clean.allow.multiple")
|
||||||
|
.defaultValue(true)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
|
||||||
|
+ ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
|
||||||
|
|
||||||
public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
|
public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
|
||||||
.key("hoodie.archive.merge.files.batch.size")
|
.key("hoodie.archive.merge.files.batch.size")
|
||||||
.defaultValue(10)
|
.defaultValue(10)
|
||||||
@@ -642,6 +649,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
|
||||||
|
compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withCleanerParallelism(int cleanerParallelism) {
|
public Builder withCleanerParallelism(int cleanerParallelism) {
|
||||||
compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
|
compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -1117,6 +1117,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
|
return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean allowMultipleCleans() {
|
||||||
|
return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean shouldAutoTuneInsertSplits() {
|
public boolean shouldAutoTuneInsertSplits() {
|
||||||
return getBoolean(HoodieCompactionConfig.COPY_ON_WRITE_AUTO_SPLIT_INSERTS);
|
return getBoolean(HoodieCompactionConfig.COPY_ON_WRITE_AUTO_SPLIT_INSERTS);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
|
|||||||
* Writes clean metadata to table metadata.
|
* Writes clean metadata to table metadata.
|
||||||
* @param metadata clean metadata of interest.
|
* @param metadata clean metadata of interest.
|
||||||
*/
|
*/
|
||||||
protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
|
protected final void writeTableMetadata(HoodieCleanMetadata metadata, String instantTime) {
|
||||||
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
|
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
if (!skipLocking) {
|
if (!skipLocking) {
|
||||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||||
}
|
}
|
||||||
writeTableMetadata(metadata);
|
writeTableMetadata(metadata, inflightInstant.getTimestamp());
|
||||||
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
|
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
|
||||||
TimelineMetadataUtils.serializeCleanMetadata(metadata));
|
TimelineMetadataUtils.serializeCleanMetadata(metadata));
|
||||||
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
|
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
|
||||||
@@ -240,9 +240,13 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
|
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
if (config.isMetadataTableEnabled()) {
|
||||||
|
table.getHoodieView().sync();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the last clean metadata for now
|
// return the last clean metadata for now
|
||||||
// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
|
// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
|
||||||
// This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
|
// This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
|||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@@ -254,6 +255,73 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
SparkRDDWriteClient::upsertPreppedRecords, true);
|
SparkRDDWriteClient::upsertPreppedRecords, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultiClean() {
|
||||||
|
HoodieWriteConfig writeConfig = getConfigBuilder()
|
||||||
|
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
|
||||||
|
.withEnableBackupForRemoteFileSystemView(false).build())
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||||
|
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
|
||||||
|
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
|
||||||
|
.allowMultipleCleans(false)
|
||||||
|
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
|
||||||
|
.withEmbeddedTimelineServerEnabled(false).build();
|
||||||
|
|
||||||
|
int index = 0;
|
||||||
|
String cleanInstantTime;
|
||||||
|
final String partition = "2015/03/16";
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
|
||||||
|
// Three writes so we can initiate a clean
|
||||||
|
for (; index < 3; ++index) {
|
||||||
|
String newCommitTime = "00" + index;
|
||||||
|
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mimic failed/leftover clean by scheduling a clean but not performing it
|
||||||
|
cleanInstantTime = "00" + index++;
|
||||||
|
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
|
||||||
|
Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, cleanInstantTime, Option.empty());
|
||||||
|
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition).size(), 1);
|
||||||
|
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().countInstants(), 1);
|
||||||
|
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
|
||||||
|
// Next commit. This is required so that there is an additional file version to clean.
|
||||||
|
String newCommitTime = "00" + index++;
|
||||||
|
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||||
|
|
||||||
|
// Initiate another clean. The previous leftover clean will be attempted first, followed by another clean
|
||||||
|
// due to the commit above.
|
||||||
|
String newCleanInstantTime = "00" + index++;
|
||||||
|
HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime);
|
||||||
|
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
|
||||||
|
assertNull(cleanMetadata);
|
||||||
|
|
||||||
|
// let the old clean complete
|
||||||
|
table = HoodieSparkTable.create(writeConfig, context);
|
||||||
|
cleanMetadata = table.clean(context, cleanInstantTime, false);
|
||||||
|
assertNotNull(cleanMetadata);
|
||||||
|
|
||||||
|
// any new clean should go ahead
|
||||||
|
cleanMetadata = client.clean(newCleanInstantTime);
|
||||||
|
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
|
||||||
|
assertNotNull(cleanMetadata);
|
||||||
|
|
||||||
|
// 1 file cleaned
|
||||||
|
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
|
||||||
|
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
|
||||||
|
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
|
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV1Mig
|
|||||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
|
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
|
||||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
|
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
|
||||||
|
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -43,6 +46,9 @@ import java.util.Map;
|
|||||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
|
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
|
||||||
|
|
||||||
public class CleanerUtils {
|
public class CleanerUtils {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(CleanerUtils.class);
|
||||||
|
|
||||||
public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
|
public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
|
||||||
public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
|
public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
|
||||||
public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
|
public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
|
||||||
@@ -131,6 +137,7 @@ public class CleanerUtils {
|
|||||||
// No need to do any special cleanup for failed operations during clean
|
// No need to do any special cleanup for failed operations during clean
|
||||||
return;
|
return;
|
||||||
} else if (cleaningPolicy.isLazy()) {
|
} else if (cleaningPolicy.isLazy()) {
|
||||||
|
LOG.info("Cleaned failed attempts if any");
|
||||||
// Perform rollback of failed operations for all types of actions during clean
|
// Perform rollback of failed operations for all types of actions during clean
|
||||||
rollbackFailedWritesFunc.apply();
|
rollbackFailedWritesFunc.apply();
|
||||||
return;
|
return;
|
||||||
@@ -140,6 +147,7 @@ public class CleanerUtils {
|
|||||||
case COMMIT_ACTION:
|
case COMMIT_ACTION:
|
||||||
// For any other actions, perform rollback of failed writes
|
// For any other actions, perform rollback of failed writes
|
||||||
if (cleaningPolicy.isEager()) {
|
if (cleaningPolicy.isEager()) {
|
||||||
|
LOG.info("Cleaned failed attempts if any");
|
||||||
rollbackFailedWritesFunc.apply();
|
rollbackFailedWritesFunc.apply();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user