[HUDI-2685] Support scheduling online compaction plan when there are no commit data (#3928)
Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
@@ -109,11 +109,17 @@ public class ITTestHoodieFlinkCompactor {
|
||||
// infer changelog mode
|
||||
CompactionUtil.inferChangelogMode(conf, metaClient);
|
||||
|
||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
|
||||
|
||||
boolean scheduled = false;
|
||||
// judge whether have operation
|
||||
// To compute the compaction instant time and do compaction.
|
||||
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
|
||||
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||
if (compactionInstantTimeOption.isPresent()) {
|
||||
scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
|
||||
}
|
||||
|
||||
String compactionInstantTime = compactionInstantTimeOption.get();
|
||||
|
||||
assertTrue(scheduled, "The compaction plan should be scheduled");
|
||||
|
||||
|
||||
@@ -223,6 +223,10 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
||||
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
|
||||
}
|
||||
|
||||
public void endInput() {
|
||||
writeFunction.endInput();
|
||||
}
|
||||
|
||||
public void checkpointComplete(long checkpointId) {
|
||||
stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||
coordinator.notifyCheckpointComplete(checkpointId);
|
||||
@@ -248,6 +252,11 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
|
||||
public void close() throws Exception {
|
||||
coordinator.close();
|
||||
ioManager.close();
|
||||
bucketAssignerFunction.close();
|
||||
writeFunction.close();
|
||||
if (compactFunctionWrapper != null) {
|
||||
compactFunctionWrapper.close();
|
||||
}
|
||||
}
|
||||
|
||||
public StreamWriteOperatorCoordinator getCoordinator() {
|
||||
|
||||
@@ -20,11 +20,13 @@ package org.apache.hudi.utils;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
@@ -33,14 +35,16 @@ import org.apache.hudi.util.FlinkTables;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
@@ -60,16 +64,24 @@ public class TestCompactionUtil {
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
beforeEach(Collections.emptyMap());
|
||||
}
|
||||
|
||||
void beforeEach(Map<String, String> options) throws IOException {
|
||||
this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
options.forEach((k, v) -> conf.setString(k, v));
|
||||
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
|
||||
this.table = FlinkTables.createTable(conf);
|
||||
this.metaClient = table.getMetaClient();
|
||||
}
|
||||
|
||||
@Test
|
||||
void rollbackCompaction() {
|
||||
void rollbackCompaction() throws Exception {
|
||||
beforeEach();
|
||||
List<String> oriInstants = IntStream.range(0, 3)
|
||||
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline()
|
||||
@@ -88,7 +100,8 @@ public class TestCompactionUtil {
|
||||
}
|
||||
|
||||
@Test
|
||||
void rollbackEarliestCompaction() {
|
||||
void rollbackEarliestCompaction() throws Exception {
|
||||
beforeEach();
|
||||
conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
|
||||
List<String> oriInstants = IntStream.range(0, 3)
|
||||
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
|
||||
@@ -109,6 +122,33 @@ public class TestCompactionUtil {
|
||||
assertThat(instantTime, is(oriInstants.get(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testScheduleCompaction() throws Exception {
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
|
||||
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), FlinkOptions.TIME_ELAPSED);
|
||||
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
|
||||
beforeEach(options);
|
||||
|
||||
// write a commit with data first
|
||||
TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);
|
||||
|
||||
HoodieFlinkWriteClient<?> writeClient = StreamerUtil.createWriteClient(conf);
|
||||
CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true);
|
||||
|
||||
Option<HoodieInstant> pendingCompactionInstant = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
|
||||
assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan expects to be scheduled");
|
||||
|
||||
// write another commit with data and start a new instant
|
||||
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
|
||||
TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too close
|
||||
writeClient.startCommit();
|
||||
|
||||
CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false);
|
||||
int numCompactionCommits = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
|
||||
assertThat("Two compaction plan expects to be scheduled", numCompactionCommits, is(2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a compaction plan on the timeline and returns its instant time.
|
||||
*/
|
||||
|
||||
@@ -343,6 +343,37 @@ public class TestData {
|
||||
funcWrapper.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a list of row data with Hoodie format base on the given configuration.
|
||||
*
|
||||
* <p>The difference with {@link #writeData} is that it flush data using #endInput, and it
|
||||
* does not generate inflight instant.
|
||||
*
|
||||
* @param dataBuffer The data buffer to write
|
||||
* @param conf The flink configuration
|
||||
* @throws Exception if error occurs
|
||||
*/
|
||||
public static void writeDataAsBatch(
|
||||
List<RowData> dataBuffer,
|
||||
Configuration conf) throws Exception {
|
||||
StreamWriteFunctionWrapper<RowData> funcWrapper = new StreamWriteFunctionWrapper<>(
|
||||
conf.getString(FlinkOptions.PATH),
|
||||
conf);
|
||||
funcWrapper.openFunction();
|
||||
|
||||
for (RowData rowData : dataBuffer) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.endInput();
|
||||
|
||||
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
|
||||
|
||||
funcWrapper.close();
|
||||
}
|
||||
|
||||
private static String toStringSafely(Object obj) {
|
||||
return obj == null ? "null" : obj.toString();
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ public class TestStreamerUtil {
|
||||
void testMedianInstantTime() {
|
||||
String higher = "20210705125921";
|
||||
String lower = "20210705125806";
|
||||
String median1 = StreamerUtil.medianInstantTime(higher, lower);
|
||||
String median1 = StreamerUtil.medianInstantTime(higher, lower).get();
|
||||
assertThat(median1, is("20210705125843"));
|
||||
// test symmetry
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
|
||||
Reference in New Issue
Block a user