[HUDI-1029] In inline compaction mode, previously failed compactions needs to be retried before new compactions (#1857)
- Prevents failed compactions from causing issues with future commits
This commit is contained in:
@@ -341,6 +341,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
||||
|
||||
// Do an inline compaction if enabled
|
||||
if (config.isInlineCompaction()) {
|
||||
runAnyPendingCompactions(table);
|
||||
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
|
||||
inlineCompact(extraMetadata);
|
||||
} else {
|
||||
@@ -355,6 +356,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
||||
}
|
||||
}
|
||||
|
||||
private void runAnyPendingCompactions(HoodieTable<?> table) {
|
||||
table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants()
|
||||
.forEach(instant -> {
|
||||
LOG.info("Running previously failed inflight compaction at instant " + instant);
|
||||
compact(instant.getTimestamp(), true);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle auto clean during commit.
|
||||
* @param instantTime
|
||||
|
||||
@@ -65,7 +65,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
|
||||
int deltaCommitsSinceLastCompaction = table.getActiveTimeline().getDeltaCommitTimeline()
|
||||
.findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
|
||||
if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
|
||||
LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
|
||||
LOG.info("Not scheduling compaction as only " + deltaCommitsSinceLastCompaction
|
||||
+ " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
|
||||
+ config.getInlineCompactDeltaCommitMax());
|
||||
return new HoodieCompactionPlan();
|
||||
|
||||
@@ -0,0 +1,243 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.compact;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class CompactionTestBase extends HoodieClientTestBase {
|
||||
|
||||
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2)
|
||||
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
|
||||
.forTable("test-trip-table")
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* HELPER METHODS FOR TESTING.
|
||||
**/
|
||||
protected void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
|
||||
HoodieWriteConfig cfg) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieTable table = getHoodieTable(metaClient, cfg);
|
||||
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
|
||||
fileSliceList.forEach(fileSlice -> {
|
||||
Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
|
||||
if (opPair != null) {
|
||||
assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
|
||||
assertTrue(fileSlice.getLogFiles().count() > 0,
|
||||
"Expect atleast one log file to be present where the latest delta commit was written");
|
||||
assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
|
||||
} else {
|
||||
assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
|
||||
"Expect baseInstant to be less than or equal to latestDeltaCommit");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
|
||||
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
|
||||
throws Exception {
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
|
||||
List<String> gotPendingCompactionInstants =
|
||||
pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
|
||||
assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
|
||||
|
||||
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
|
||||
CompactionUtils.getAllPendingCompactionOperations(metaClient);
|
||||
|
||||
if (insertFirst) {
|
||||
// Use first instant for inserting records
|
||||
String firstInstant = deltaInstants.get(0);
|
||||
deltaInstants = deltaInstants.subList(1, deltaInstants.size());
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
client.startCommitWithTime(firstInstant);
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
|
||||
List<WriteStatus> statusList = statuses.collect();
|
||||
|
||||
if (!cfg.shouldAutoCommit()) {
|
||||
client.commit(firstInstant, statuses);
|
||||
}
|
||||
assertNoWriteErrors(statusList);
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
|
||||
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
|
||||
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
|
||||
}
|
||||
|
||||
int numRecords = records.size();
|
||||
for (String instantTime : deltaInstants) {
|
||||
records = dataGen.generateUpdates(instantTime, numRecords);
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
|
||||
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
|
||||
protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
||||
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
|
||||
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
|
||||
.filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
|
||||
assertTrue(instant.isInflight(), "Instant must be marked inflight");
|
||||
}
|
||||
|
||||
protected void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) {
|
||||
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
|
||||
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
|
||||
}
|
||||
|
||||
protected void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
|
||||
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
|
||||
scheduleCompaction(compactionInstantTime, client, cfg);
|
||||
executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
|
||||
}
|
||||
|
||||
protected void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
|
||||
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
|
||||
|
||||
client.compact(compactionInstantTime);
|
||||
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
|
||||
assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
|
||||
assertFalse(fileSliceList.stream()
|
||||
.anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
|
||||
"Verify all file-slices have base-instant same as compaction instant");
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
|
||||
"Verify all file-slices have data-files");
|
||||
|
||||
if (hasDeltaCommitAfterPendingCompaction) {
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
|
||||
"Verify all file-slices have atleast one log-file");
|
||||
} else {
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
|
||||
"Verify all file-slices have no log-files");
|
||||
}
|
||||
|
||||
// verify that there is a commit
|
||||
table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
|
||||
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
|
||||
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
|
||||
assertEquals(latestCompactionCommitTime, compactionInstantTime,
|
||||
"Expect compaction instant time to be the latest commit time");
|
||||
assertEquals(expectedNumRecs,
|
||||
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
|
||||
"Must contain expected records");
|
||||
|
||||
}
|
||||
|
||||
protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, HoodieWriteClient client,
|
||||
HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
client.startCommitWithTime(instantTime);
|
||||
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
|
||||
List<WriteStatus> statusList = statuses.collect();
|
||||
assertNoWriteErrors(statusList);
|
||||
if (!cfg.shouldAutoCommit() && !skipCommit) {
|
||||
client.commit(instantTime, statuses);
|
||||
}
|
||||
|
||||
Option<HoodieInstant> deltaCommit =
|
||||
metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
if (skipCommit && !cfg.shouldAutoCommit()) {
|
||||
assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
|
||||
"Delta commit should not be latest instant");
|
||||
} else {
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
|
||||
}
|
||||
return statusList;
|
||||
}
|
||||
|
||||
protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
|
||||
HoodieTableFileSystemView view =
|
||||
getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
|
||||
return view.getLatestBaseFiles().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
|
||||
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
|
||||
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
|
||||
return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
|
||||
.flatMap(view::getLatestFileSlices).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected HoodieTableType getTableType() {
|
||||
return HoodieTableType.MERGE_ON_READ;
|
||||
}
|
||||
}
|
||||
@@ -18,77 +18,36 @@
|
||||
|
||||
package org.apache.hudi.table.action.compact;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Test Cases for Async Compaction and Ingestion interaction.
|
||||
*/
|
||||
public class TestAsyncCompaction extends HoodieClientTestBase {
|
||||
public class TestAsyncCompaction extends CompactionTestBase {
|
||||
|
||||
private HoodieWriteConfig getConfig(Boolean autoCommit) {
|
||||
return getConfigBuilder(autoCommit).build();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
|
||||
.forTable("test-trip-table")
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackForInflightCompaction() throws Exception {
|
||||
// Rollback inflight compaction
|
||||
@@ -372,170 +331,4 @@ public class TestAsyncCompaction extends HoodieClientTestBase {
|
||||
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HELPER METHODS FOR TESTING.
|
||||
**/
|
||||
|
||||
private void validateDeltaCommit(String latestDeltaCommit,
|
||||
final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
|
||||
HoodieWriteConfig cfg) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieTable table = getHoodieTable(metaClient, cfg);
|
||||
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
|
||||
fileSliceList.forEach(fileSlice -> {
|
||||
Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
|
||||
if (opPair != null) {
|
||||
assertEquals(fileSlice.getBaseInstantTime(), opPair.getKey(), "Expect baseInstant to match compaction Instant");
|
||||
assertTrue(fileSlice.getLogFiles().count() > 0,
|
||||
"Expect atleast one log file to be present where the latest delta commit was written");
|
||||
assertFalse(fileSlice.getBaseFile().isPresent(), "Expect no data-file to be present");
|
||||
} else {
|
||||
assertTrue(fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0,
|
||||
"Expect baseInstant to be less than or equal to latestDeltaCommit");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, final HoodieReadClient readClient, List<String> deltaInstants,
|
||||
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
|
||||
throws Exception {
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
|
||||
List<String> gotPendingCompactionInstants =
|
||||
pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
|
||||
assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
|
||||
|
||||
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
|
||||
CompactionUtils.getAllPendingCompactionOperations(metaClient);
|
||||
|
||||
if (insertFirst) {
|
||||
// Use first instant for inserting records
|
||||
String firstInstant = deltaInstants.get(0);
|
||||
deltaInstants = deltaInstants.subList(1, deltaInstants.size());
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
client.startCommitWithTime(firstInstant);
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
|
||||
List<WriteStatus> statusList = statuses.collect();
|
||||
|
||||
if (!cfg.shouldAutoCommit()) {
|
||||
client.commit(firstInstant, statuses);
|
||||
}
|
||||
assertNoWriteErrors(statusList);
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
|
||||
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
|
||||
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
|
||||
"should list the parquet files we wrote in the delta commit");
|
||||
validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
|
||||
}
|
||||
|
||||
int numRecords = records.size();
|
||||
for (String instantTime : deltaInstants) {
|
||||
records = dataGen.generateUpdates(instantTime, numRecords);
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
|
||||
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
|
||||
}
|
||||
return records;
|
||||
}
|
||||
|
||||
private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
||||
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
|
||||
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
|
||||
.filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
|
||||
assertTrue(instant.isInflight(), "Instant must be marked inflight");
|
||||
}
|
||||
|
||||
private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg)
|
||||
throws IOException {
|
||||
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
|
||||
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
|
||||
}
|
||||
|
||||
private void scheduleAndExecuteCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
|
||||
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
|
||||
scheduleCompaction(compactionInstantTime, client, cfg);
|
||||
executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
|
||||
}
|
||||
|
||||
private void executeCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieTable table,
|
||||
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
|
||||
|
||||
client.compact(compactionInstantTime);
|
||||
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
|
||||
assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
|
||||
assertFalse(fileSliceList.stream()
|
||||
.anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)),
|
||||
"Verify all file-slices have base-instant same as compaction instant");
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()),
|
||||
"Verify all file-slices have data-files");
|
||||
|
||||
if (hasDeltaCommitAfterPendingCompaction) {
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0),
|
||||
"Verify all file-slices have atleast one log-file");
|
||||
} else {
|
||||
assertFalse(fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0),
|
||||
"Verify all file-slices have no log-files");
|
||||
}
|
||||
|
||||
// verify that there is a commit
|
||||
table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
|
||||
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
|
||||
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
|
||||
assertEquals(latestCompactionCommitTime, compactionInstantTime,
|
||||
"Expect compaction instant time to be the latest commit time");
|
||||
assertEquals(expectedNumRecs,
|
||||
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
|
||||
"Must contain expected records");
|
||||
|
||||
}
|
||||
|
||||
private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records,
|
||||
HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
client.startCommitWithTime(instantTime);
|
||||
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
|
||||
List<WriteStatus> statusList = statuses.collect();
|
||||
assertNoWriteErrors(statusList);
|
||||
if (!cfg.shouldAutoCommit() && !skipCommit) {
|
||||
client.commit(instantTime, statuses);
|
||||
}
|
||||
|
||||
Option<HoodieInstant> deltaCommit =
|
||||
metaClient.getActiveTimeline().reload().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
if (skipCommit && !cfg.shouldAutoCommit()) {
|
||||
assertTrue(deltaCommit.get().getTimestamp().compareTo(instantTime) < 0,
|
||||
"Delta commit should not be latest instant");
|
||||
} else {
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals(instantTime, deltaCommit.get().getTimestamp(), "Delta commit should be latest instant");
|
||||
}
|
||||
return statusList;
|
||||
}
|
||||
|
||||
private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
|
||||
HoodieTableFileSystemView view =
|
||||
getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
|
||||
return view.getLatestBaseFiles().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
|
||||
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
|
||||
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
|
||||
return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
|
||||
.flatMap(view::getLatestFileSlices).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected HoodieTableType getTableType() {
|
||||
return HoodieTableType.MERGE_ON_READ;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.compact;
|
||||
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestInlineCompaction extends CompactionTestBase {
|
||||
|
||||
private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
|
||||
return getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionIsNotScheduledEarly() throws Exception {
|
||||
// Given: make two commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
|
||||
try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts("000", 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", "001"), records, cfg, true, new ArrayList<>());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
|
||||
// Then: ensure no compaction is executedm since there are only 2 delta commits
|
||||
assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulCompaction() throws Exception {
|
||||
// Given: make three commits
|
||||
HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
|
||||
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
|
||||
|
||||
try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
|
||||
// third commit, that will trigger compaction
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
|
||||
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
|
||||
|
||||
// Then: ensure the file slices are compacted as per policy
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionRetryOnFailure() throws Exception {
|
||||
// Given: two commits, schedule compaction and its failed/in-flight
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.build();
|
||||
List<String> instants = CollectionUtils.createImmutableList("000", "001");
|
||||
try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
|
||||
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
|
||||
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
|
||||
// Schedule compaction 002, make it in-flight (simulates inline compaction failing)
|
||||
scheduleCompaction("002", writeClient, cfg);
|
||||
moveCompactionFromRequestedToInflight("002", cfg);
|
||||
}
|
||||
|
||||
// When: a third commit happens
|
||||
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
|
||||
try (HoodieWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), writeClient, metaClient, inlineCfg, false);
|
||||
}
|
||||
|
||||
// Then: 1 delta commit is done, the failed compaction is retried
|
||||
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
|
||||
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
|
||||
assertEquals("002", metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user