1
0

[HUDI-3494] Consider triggering condition of MOR compaction during archival (#4974)

This commit is contained in:
Y Ethan Guo
2022-03-16 22:28:11 -07:00
committed by GitHub
parent 95e6e53810
commit 5ba2d9ab2f
5 changed files with 400 additions and 38 deletions

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -43,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -52,6 +54,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -76,6 +79,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
@@ -395,6 +399,18 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
// made after the first savepoint present.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the active timeline
// to trigger compaction scheduling, when the trigger strategy of compaction is
// NUM_COMMITS or NUM_AND_TIME.
Option<HoodieInstant> oldestInstantToRetainForCompaction =
(metaClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& (config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS
|| config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME))
? CompactionUtils.getOldestInstantToRetainForCompaction(
table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
@@ -405,14 +421,21 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
return oldestPendingCompactionAndReplaceInstant
.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
});
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
instantToArchiveStream = instantToArchiveStream.filter(s -> oldestInflightCommitInstant.map(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true));
}
}).filter(s -> {
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -128,27 +129,25 @@ public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I,
return new HoodieCompactionPlan();
}
private Pair<Integer, String> getLatestDeltaCommitInfo() {
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();
String latestInstantTs;
final int deltaCommitsSinceLastCompaction;
if (lastCompaction.isPresent()) {
latestInstantTs = lastCompaction.get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
} else {
latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants();
private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline());
if (deltaCommitsInfo.isPresent()) {
return Option.of(Pair.of(
deltaCommitsInfo.get().getLeft().countInstants(),
deltaCommitsInfo.get().getRight().getTimestamp()));
}
return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
return Option.empty();
}
private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
Option<Pair<Integer, String>> latestDeltaCommitInfoOption = getLatestDeltaCommitInfo();
if (!latestDeltaCommitInfoOption.isPresent()) {
return false;
}
Pair<Integer, String> latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
switch (compactionTriggerStrategy) {

View File

@@ -44,6 +44,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -71,6 +72,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
@@ -125,9 +127,20 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200);
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
@@ -137,13 +150,14 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
boolean enableArchiveMerge,
int archiveFilesBatch,
long size) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5,
maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
boolean enableArchiveMerge,
@@ -153,6 +167,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
.withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
.withArchiveMergeSmallFileLimit(size)
@@ -546,7 +561,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2,
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2, 2,
HoodieTableType.MERGE_ON_READ);
// when max archival commits is set to 4, even after 7 commits, if there is an inflight compaction in the middle, archival should not kick in.
@@ -946,6 +961,152 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(
enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ);
// When max archival commits is set to 4, even after 8 delta commits, since the number of delta
// commits is still smaller than 8, the archival should not kick in.
// The archival should only kick in after the 9th delta commit
// instant "00000001" to "00000009"
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1
? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
if (i <= 8) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
assertFalse(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001")));
IntStream.range(2, 10).forEach(j ->
assertTrue(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
}
}
testTable.doCompaction("00000010", Arrays.asList("p1", "p2"));
// instant "00000011" to "00000019"
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1
? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
// first 9 delta commits before the completed compaction should be archived
IntStream.range(1, 10).forEach(j ->
assertFalse(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
if (i == 1) {
assertEquals(8, originalCommits.size() - commitsAfterArchival.size());
// instant from "00000011" should be in the active timeline
assertTrue(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010")));
assertTrue(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011")));
} else if (i < 8) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
assertEquals(1, originalCommits.size() - commitsAfterArchival.size());
assertFalse(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010")));
// i == 8 -> ["00000011", "00000018"] should be in the active timeline
// i == 9 -> ["00000012", "00000019"] should be in the active timeline
if (i == 9) {
assertFalse(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011")));
}
IntStream.range(i - 7, i + 1).forEach(j ->
assertTrue(commitsAfterArchival.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000001" + j))));
}
}
}
@Test
public void testArchivalAndCompactionInMetadataTable() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
// Test configs where metadata table has more aggressive archival configs than the compaction config
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.retainCommits(1).archiveCommitsWith(2, 4).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(8)
.retainCommits(1).archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
initWriteConfigAndMetatableWriter(writeConfig, true);
HoodieTableMetaClient metadataTableMetaClient = HoodieTableMetaClient.builder()
.setConf(metaClient.getHadoopConf())
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))
.setLoadActiveTimelineOnLoad(true).build();
for (int i = 1; i <= 16; i++) {
testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT,
i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// archival
archiveAndGetCommitsList(writeConfig);
metadataTableMetaClient = HoodieTableMetaClient.reload(metadataTableMetaClient);
List<HoodieInstant> metadataTableInstants = metadataTableMetaClient.getActiveTimeline()
.getCommitsTimeline().filterCompletedInstants().getInstants()
.collect(Collectors.toList());
if (i <= 7) {
// In the metadata table timeline, the first delta commit is "00000000000000"
// from metadata table init, delta commits "00000001" till "00000007" are added
// later on without archival or compaction
assertEquals(i + 1, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000000000000")));
IntStream.range(1, i + 1).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
} else if (i <= 14) {
// In the metadata table timeline, the first delta commit is "00000007001"
// from metadata table compaction, after archival, delta commits "00000008"
// till "00000014" are added later on without archival or compaction
assertEquals(i - 6, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001")));
IntStream.range(8, i + 1).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
} else if (i == 15) {
// Only delta commits "00000008" till "00000015" are in the active timeline
assertEquals(8, metadataTableInstants.size());
assertFalse(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001")));
IntStream.range(8, 16).forEach(j ->
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
"000000" + String.format("%02d", j)))));
} else {
// i == 16
// Only commit "00000015001" and delta commit "00000016" are in the active timeline
assertEquals(2, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001")));
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000016")));
}
}
}
private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {
metaClient.reloadActiveTimeline();
HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();