[HUDI-2383] Clean the marker files after compaction (#3576)
This commit is contained in:
@@ -301,6 +301,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
SparkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
|
||||
if (compactionTimer != null) {
|
||||
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
|
||||
|
||||
@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
@@ -175,6 +176,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
|
||||
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
|
||||
|
||||
client.compact(compactionInstantTime);
|
||||
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, compactionInstantTime).doesMarkerDirExist());
|
||||
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
|
||||
assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
|
||||
assertFalse(fileSliceList.stream()
|
||||
|
||||
@@ -26,6 +26,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -35,6 +37,7 @@ import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
public class TestInlineCompaction extends CompactionTestBase {
|
||||
|
||||
@@ -84,6 +87,8 @@ public class TestInlineCompaction extends CompactionTestBase {
|
||||
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
|
||||
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
|
||||
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
|
||||
String compactionTime = metaClient.getActiveTimeline().lastInstant().get().getTimestamp();
|
||||
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), HoodieSparkTable.create(cfg, context), compactionTime).doesMarkerDirExist());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user