[HUDI-2654] Schedules the compaction from earliest for flink (#3891)
This commit is contained in:
@@ -492,6 +492,12 @@ public class FlinkOptions extends HoodieConfig {
|
||||
.defaultValue(3600) // default 1 hour
|
||||
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
|
||||
|
||||
public static final ConfigOption<Integer> COMPACTION_TIMEOUT_SECONDS = ConfigOptions
|
||||
.key("compaction.timeout.seconds")
|
||||
.intType()
|
||||
.defaultValue(1200) // default 20 minutes
|
||||
.withDescription("Max timeout time in seconds for online compaction to rollback, default 20 minutes");
|
||||
|
||||
public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
|
||||
.key("compaction.max_memory")
|
||||
.intType()
|
||||
|
||||
@@ -18,9 +18,6 @@
|
||||
|
||||
package org.apache.hudi.sink.bootstrap;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
@@ -37,9 +34,9 @@ import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.format.FormatUtils;
|
||||
import org.apache.hudi.util.FlinkTables;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
@@ -119,7 +116,7 @@ public class BootstrapOperator<I, O extends HoodieRecord>
|
||||
|
||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
||||
this.hoodieTable = getTable();
|
||||
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
||||
|
||||
preLoadIndexRecords();
|
||||
}
|
||||
@@ -146,13 +143,6 @@ public class BootstrapOperator<I, O extends HoodieRecord>
|
||||
output.collect((StreamRecord<O>) element);
|
||||
}
|
||||
|
||||
private HoodieFlinkTable getTable() {
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(this.hadoopConf),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
return HoodieFlinkTable.create(this.writeConfig, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads all the indices of give partition path into the backup state.
|
||||
*
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
package org.apache.hudi.sink.compact;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.common.model.CompactionOperation;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
@@ -27,7 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.util.CompactionUtil;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.util.FlinkTables;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
@@ -54,14 +53,10 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
*/
|
||||
private final Configuration conf;
|
||||
|
||||
/**
|
||||
* Write Client.
|
||||
*/
|
||||
private transient HoodieFlinkWriteClient writeClient;
|
||||
|
||||
/**
|
||||
* Meta Client.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
private transient HoodieFlinkTable table;
|
||||
|
||||
public CompactionPlanOperator(Configuration conf) {
|
||||
@@ -71,8 +66,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
@Override
|
||||
public void open() throws Exception {
|
||||
super.open();
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||
this.table = writeClient.getHoodieTable();
|
||||
this.table = FlinkTables.createTable(conf, getRuntimeContext());
|
||||
// when starting up, rolls back the first inflight compaction instant if there exists,
|
||||
// the instant is the next one to schedule for scheduling task because the compaction instants are
|
||||
// scheduled from earliest(FIFO sequence).
|
||||
CompactionUtil.rollbackEarliestCompaction(this.table);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -84,6 +82,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
try {
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
// There is no good way to infer when the compaction task for an instant crushed
|
||||
// or is still undergoing. So we use a configured timeout threshold to control the rollback:
|
||||
// {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
|
||||
// when the threshold hits, but an instant is still in pending(inflight) state, assumes it has failed
|
||||
// already and just roll it back.
|
||||
CompactionUtil.rollbackCompaction(table, conf);
|
||||
scheduleCompaction(table, checkpointId);
|
||||
} catch (Throwable throwable) {
|
||||
@@ -94,15 +97,15 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
||||
|
||||
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
|
||||
// the last instant takes the highest priority.
|
||||
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
|
||||
if (!lastRequested.isPresent()) {
|
||||
Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
|
||||
if (!firstRequested.isPresent()) {
|
||||
// do nothing.
|
||||
LOG.info("No compaction plan for checkpoint " + checkpointId);
|
||||
return;
|
||||
}
|
||||
|
||||
String compactionInstantTime = lastRequested.get().getTimestamp();
|
||||
String compactionInstantTime = firstRequested.get().getTimestamp();
|
||||
|
||||
// generate compaction plan
|
||||
// should support configurable commit metadata
|
||||
|
||||
@@ -112,19 +112,34 @@ public class CompactionUtil {
|
||||
|
||||
public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
|
||||
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
|
||||
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
|
||||
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline()
|
||||
.filter(instant ->
|
||||
instant.getState() == HoodieInstant.State.INFLIGHT
|
||||
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
|
||||
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
|
||||
LOG.info("Rollback the pending compaction instant: " + inflightInstant);
|
||||
LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for timeout(" + deltaSeconds + "s)");
|
||||
table.rollbackInflightCompaction(inflightInstant);
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Rolls back the earliest compaction if there exists.
|
||||
*/
|
||||
public static void rollbackEarliestCompaction(HoodieFlinkTable<?> table) {
|
||||
Option<HoodieInstant> earliestInflight = table.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline()
|
||||
.filter(instant ->
|
||||
instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
|
||||
if (earliestInflight.isPresent()) {
|
||||
LOG.info("Rollback the inflight compaction instant: " + earliestInflight.get() + " for failover");
|
||||
table.rollbackInflightCompaction(earliestInflight.get());
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the execution sequence is LIFO.
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
|
||||
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
|
||||
import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
|
||||
import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;
|
||||
|
||||
/**
|
||||
* Utilities for {@link org.apache.hudi.table.HoodieFlinkTable}.
|
||||
*/
|
||||
public class FlinkTables {
|
||||
private FlinkTables() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the hoodie flink table.
|
||||
*
|
||||
* <p>This expects to be used by client.
|
||||
*/
|
||||
public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(runtimeContext));
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
|
||||
return HoodieFlinkTable.create(writeConfig, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the hoodie flink table.
|
||||
*
|
||||
* <p>This expects to be used by client.
|
||||
*/
|
||||
public static HoodieFlinkTable<?> createTable(
|
||||
HoodieWriteConfig writeConfig,
|
||||
org.apache.hadoop.conf.Configuration hadoopConf,
|
||||
RuntimeContext runtimeContext) {
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(hadoopConf),
|
||||
new FlinkTaskContextSupplier(runtimeContext));
|
||||
return HoodieFlinkTable.create(writeConfig, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the hoodie flink table.
|
||||
*
|
||||
* <p>This expects to be used by driver.
|
||||
*/
|
||||
public static HoodieFlinkTable<?> createTable(Configuration conf) {
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
|
||||
return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,6 @@ package org.apache.hudi.utils;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -30,38 +29,90 @@ import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.util.CompactionUtil;
|
||||
import org.apache.hudi.util.FlinkTables;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Test cases for {@link org.apache.hudi.util.CompactionUtil}.
|
||||
*/
|
||||
public class TestCompactionUtil {
|
||||
|
||||
private HoodieFlinkTable<?> table;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
private Configuration conf;
|
||||
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@Test
|
||||
void rollbackCompaction() throws IOException {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, 0);
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
this.table = FlinkTables.createTable(conf);
|
||||
this.metaClient = table.getMetaClient();
|
||||
}
|
||||
|
||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
|
||||
HoodieFlinkTable table = writeClient.getHoodieTable();
|
||||
HoodieTableMetaClient metaClient = table.getMetaClient();
|
||||
@Test
|
||||
void rollbackCompaction() {
|
||||
conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
|
||||
List<String> oriInstants = IntStream.range(0, 3)
|
||||
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline()
|
||||
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
|
||||
.getInstants()
|
||||
.collect(Collectors.toList());
|
||||
assertThat("all the instants should be in pending state", instants.size(), is(3));
|
||||
CompactionUtil.rollbackCompaction(table, conf);
|
||||
boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
|
||||
.allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
|
||||
assertTrue(allRolledBack, "all the instants should be rolled back");
|
||||
List<String> actualInstants = metaClient.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
assertThat(actualInstants, is(oriInstants));
|
||||
}
|
||||
|
||||
@Test
|
||||
void rollbackEarliestCompaction() {
|
||||
List<String> oriInstants = IntStream.range(0, 3)
|
||||
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline()
|
||||
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
|
||||
.getInstants()
|
||||
.collect(Collectors.toList());
|
||||
assertThat("all the instants should be in pending state", instants.size(), is(3));
|
||||
CompactionUtil.rollbackEarliestCompaction(table);
|
||||
long requestedCnt = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
|
||||
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).count();
|
||||
assertThat("Only the first instant expects to be rolled back", requestedCnt, is(1L));
|
||||
|
||||
String instantTime = metaClient.getActiveTimeline()
|
||||
.filterPendingCompactionTimeline().filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED)
|
||||
.firstInstant().get().getTimestamp();
|
||||
assertThat(instantTime, is(oriInstants.get(0)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a compaction plan on the timeline and returns its instant time.
|
||||
*/
|
||||
private String generateCompactionPlan() {
|
||||
HoodieCompactionOperation operation = new HoodieCompactionOperation();
|
||||
HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1);
|
||||
String instantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
@@ -75,13 +126,7 @@ public class TestCompactionUtil {
|
||||
throw new HoodieIOException("Exception scheduling compaction", ioe);
|
||||
}
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().orElse(null);
|
||||
assertThat(instant.getTimestamp(), is(instantTime));
|
||||
|
||||
CompactionUtil.rollbackCompaction(table, conf);
|
||||
HoodieInstant rollbackInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
|
||||
assertThat(rollbackInstant.getState(), is(HoodieInstant.State.REQUESTED));
|
||||
assertThat(rollbackInstant.getTimestamp(), is(instantTime));
|
||||
return instantTime;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user