1
0

[HUDI-3624] Check all instants before starting a commit in metadata table (#5098)

This commit is contained in:
Y Ethan Guo
2022-03-24 17:13:58 -07:00
committed by GitHub
parent 4ddd094ba2
commit 9b3dd2e0b7
4 changed files with 221 additions and 106 deletions

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.metadata; package org.apache.hudi.metadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData;
@@ -33,6 +32,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -108,22 +108,28 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords); List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time // if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime); writeClient.startCommitWithTime(instantTime);
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
} else { } else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. if (alreadyCompletedInstant.isPresent()) {
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// are upserts to metadata table and so only a new delta commit will be created. // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// already part of completed commit. So, we have to manually remove the completed instant and proceed. // are upserts to metadata table and so only a new delta commit will be created.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
HoodieInstant alreadyCompletedInstant = // already part of completed commit. So, we have to manually remove the completed instant and proceed.
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get(); // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant); HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline(); metadataMetaClient.reloadActiveTimeline();
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.
} }
List<WriteStatus> statuses = preppedRecordList.size() > 0 List<WriteStatus> statuses = preppedRecordList.size() > 0

View File

@@ -136,21 +136,29 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
compactIfNecessary(writeClient, instantTime); compactIfNecessary(writeClient, instantTime);
} }
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time // if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime); writeClient.startCommitWithTime(instantTime);
} else { } else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. if (alreadyCompletedInstant.isPresent()) {
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// are upserts to metadata table and so only a new delta commit will be created. // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// already part of completed commit. So, we have to manually remove the completed instant and proceed. // are upserts to metadata table and so only a new delta commit will be created.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get(); // already part of completed commit. So, we have to manually remove the completed instant and proceed.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant); // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
metadataMetaClient.reloadActiveTimeline(); HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline();
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.
} }
List<WriteStatus> statuses = writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect(); List<WriteStatus> statuses = writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
statuses.forEach(writeStatus -> { statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) { if (writeStatus.hasErrors()) {

View File

@@ -75,6 +75,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.table.action.clean.CleanPlanner;
@@ -627,19 +628,24 @@ public class TestCleaner extends HoodieClientTestBase {
* @param config HoodieWriteConfig * @param config HoodieWriteConfig
*/ */
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException { protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException {
return runCleaner(config, false, 1, false); return runCleaner(config, false, false, 1, false);
} }
protected List<HoodieCleanStat> runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException { protected List<HoodieCleanStat> runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException {
return runCleaner(config, false, 1, needInstantInHudiFormat); return runCleaner(config, false, false, 1, needInstantInHudiFormat);
} }
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
return runCleaner(config, false, firstCommitSequence, needInstantInHudiFormat); return runCleaner(config, false, false, firstCommitSequence, needInstantInHudiFormat);
} }
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException {
return runCleaner(config, simulateRetryFailure, 1, false); return runCleaner(config, simulateRetryFailure, false, 1, false);
}
protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure) throws IOException {
return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false);
} }
/** /**
@@ -647,7 +653,9 @@ public class TestCleaner extends HoodieClientTestBase {
* *
* @param config HoodieWriteConfig * @param config HoodieWriteConfig
*/ */
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException { protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure,
Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config); SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d"); String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d");
HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
@@ -670,6 +678,17 @@ public class TestCleaner extends HoodieClientTestBase {
}); });
}); });
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
// Simulate the failure of corresponding instant in the metadata table
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
}
// retry clean operation again // retry clean operation again
writeClient.clean(); writeClient.clean();
final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant); final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
@@ -1215,12 +1234,80 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(0, cleanStats.size(), "Must not clean any files"); assertEquals(0, cleanStats.size(), "Must not clean any files");
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRerunFailedClean(boolean simulateMetadataFailure) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
String p0 = "2020/01/01";
String p1 = "2020/01/02";
// make 1 commit, with 1 file per partition
String file1P0C0 = UUID.randomUUID().toString();
String file1P1C0 = UUID.randomUUID().toString();
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001",
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0));
put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
})
);
metadataWriter.update(commitMetadata, "00000000000001", false);
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
metaClient = HoodieTableMetaClient.reload(metaClient);
// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
String file2P0C1 = partitionAndFileId002.get(p0);
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata =
generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1);
testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
String file3P1C2 = partitionAndFileId003.get(p1);
replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2);
testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// make next replacecommit, with 1 clustering operation. Replace data in p0 again
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
String file4P0C3 = partitionAndFileId004.get(p0);
replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3);
testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());
// run cleaner with failures
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true, simulateMetadataFailure, 5, true);
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
//file1P1C0 still stays because its not replaced until 3 and its the only version available
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}
/** /**
* Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective. * Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective.
* *
* @param insertFn Insert API to be tested * @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* @throws Exception in case of errors * @throws Exception in case of errors
*/ */
private void testInsertAndCleanFailedWritesByVersions( private void testInsertAndCleanFailedWritesByVersions(

View File

@@ -42,6 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.TestCleaner; import org.apache.hudi.table.TestCleaner;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@@ -65,9 +66,9 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
/** /**
* Tests covering different clean plan policies/strategies. * Tests covering different clean plan policies/strategies.
@@ -93,11 +94,12 @@ public class TestCleanPlanExecutor extends TestCleaner {
private static Stream<Arguments> argumentsForTestKeepLatestCommits() { private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
return Stream.of( return Stream.of(
Arguments.of(false, false, false), Arguments.of(false, false, false, false),
Arguments.of(true, false, false), Arguments.of(true, false, false, false),
Arguments.of(false, true, false), Arguments.of(true, true, false, false),
Arguments.of(false, false, true) Arguments.of(false, false, true, false),
); Arguments.of(false, false, false, true)
);
} }
/** /**
@@ -105,17 +107,19 @@ public class TestCleanPlanExecutor extends TestCleaner {
*/ */
@ParameterizedTest @ParameterizedTest
@MethodSource("argumentsForTestKeepLatestCommits") @MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { public void testKeepLatestCommits(
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean) .withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(2) .retainCommits(2)
.withMaxCommitsBeforeCleaning(2).build()) .withMaxCommitsBeforeCleaning(2).build())
.build(); .build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient); HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01"; String p0 = "2020/01/01";
@@ -130,20 +134,21 @@ public class TestCleanPlanExecutor extends TestCleaner {
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001",
Collections.unmodifiableMap(new HashMap<String, List<String>>() { Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{ {
put(p0, CollectionUtils.createImmutableList(file1P0C0)); put(p0, CollectionUtils.createImmutableList(file1P0C0));
put(p1, CollectionUtils.createImmutableList(file1P1C0)); put(p1, CollectionUtils.createImmutableList(file1P1C0));
} }
}) })
); );
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry, 2, true); List<HoodieCleanStat> hoodieCleanStatsOne =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, true);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
@@ -160,9 +165,10 @@ public class TestCleanPlanExecutor extends TestCleaner {
} }
}); });
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry, 4, true); List<HoodieCleanStat> hoodieCleanStatsTwo =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, true);
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
@@ -171,40 +177,42 @@ public class TestCleanPlanExecutor extends TestCleaner {
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
String file3P0C2 = testTable.addInflightCommit("00000000000005") String file3P0C2 = testTable.addInflightCommit("00000000000005")
.withBaseFilesInPartition(p0, file1P0C0) .withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1) .withBaseFilesInPartition(p0, file2P0C1)
.getFileIdsWithBaseFilesInPartitions(p0).get(p0); .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata("00000000000003", commitMetadata = generateCommitMetadata("00000000000003",
CollectionUtils.createImmutableMap( CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000005"), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000005"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry, 6, true); List<HoodieCleanStat> hoodieCleanStatsThree =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, true);
assertEquals(0, hoodieCleanStatsThree.size(), assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest commit time to keep"); "Must not clean any file. We have to keep 1 version before the latest commit time to keep");
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
String file4P0C3 = testTable.addInflightCommit("00000000000007") String file4P0C3 = testTable.addInflightCommit("00000000000007")
.withBaseFilesInPartition(p0, file1P0C0) .withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1) .withBaseFilesInPartition(p0, file2P0C1)
.getFileIdsWithBaseFilesInPartitions(p0).get(p0); .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata("00000000000004", commitMetadata = generateCommitMetadata("00000000000004",
CollectionUtils.createImmutableMap( CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000007"), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000007"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry, 8, true); List<HoodieCleanStat> hoodieCleanStatsFour =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, true);
// enableBootstrapSourceClean would delete the bootstrap base file as the same time // enableBootstrapSourceClean would delete the bootstrap base file as the same time
HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0); HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0);
assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size()
+ (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
: partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file");
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
@@ -220,19 +228,20 @@ public class TestCleanPlanExecutor extends TestCleaner {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
String file5P0C4 = testTable.addInflightCommit("00000000000009") String file5P0C4 = testTable.addInflightCommit("00000000000009")
.withBaseFilesInPartition(p0, file1P0C0) .withBaseFilesInPartition(p0, file1P0C0)
.withBaseFilesInPartition(p0, file2P0C1) .withBaseFilesInPartition(p0, file2P0C1)
.getFileIdsWithBaseFilesInPartitions(p0).get(p0); .getFileIdsWithBaseFilesInPartitions(p0).get(p0);
commitMetadata = generateCommitMetadata("00000000000009", CollectionUtils.createImmutableMap( commitMetadata = generateCommitMetadata("00000000000009", CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file5P0C4))); p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file5P0C4)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000009"), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000009"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry, 10, true); List<HoodieCleanStat> hoodieCleanStatsFive =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 10, true);
assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files since at least 2 commits are needed from last clean operation before " assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files since at least 2 commits are needed from last clean operation before "
+ "clean can be scheduled again"); + "clean can be scheduled again");
assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0)); assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
@@ -243,13 +252,14 @@ public class TestCleanPlanExecutor extends TestCleaner {
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, file3P0C2); testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, file3P0C2);
commitMetadata = generateCommitMetadata("00000000000011", CollectionUtils.createImmutableMap(p0, commitMetadata = generateCommitMetadata("00000000000011", CollectionUtils.createImmutableMap(p0,
CollectionUtils.createImmutableList(file3P0C2))); CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant( metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011")); new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"));
metaClient.getActiveTimeline().transitionRequestedToInflight( metaClient.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"), new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000011"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive2 = runCleaner(config, simulateFailureRetry, 12, true); List<HoodieCleanStat> hoodieCleanStatsFive2 =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 12, true);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
assertNull(cleanStat, "Must not clean any files"); assertNull(cleanStat, "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
@@ -454,15 +464,17 @@ public class TestCleanPlanExecutor extends TestCleaner {
*/ */
@ParameterizedTest @ParameterizedTest
@MethodSource("argumentsForTestKeepLatestCommits") @MethodSource("argumentsForTestKeepLatestCommits")
public void testKeepXHoursWithCleaning(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { public void testKeepXHoursWithCleaning(
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean) .withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build())
.build(); .build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient); HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01"; String p0 = "2020/01/01";
@@ -488,12 +500,13 @@ public class TestCleanPlanExecutor extends TestCleaner {
}) })
); );
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, firstCommitTs), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, firstCommitTs),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsOne =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0)); assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0));
assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0)); assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0));
@@ -512,9 +525,10 @@ public class TestCleanPlanExecutor extends TestCleaner {
} }
}); });
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, secondCommitTs), new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, secondCommitTs),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsTwo =
runCleaner(config, simulateFailureRetry, simulateMetadataFailure);
assertEquals(2, hoodieCleanStatsTwo.size(), "Should clean one file each from both the partitions"); assertEquals(2, hoodieCleanStatsTwo.size(), "Should clean one file each from both the partitions");
assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1)); assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1));
assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1)); assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1));