1
0

[HUDI-2547] Schedule Flink compaction in service (#4254)

Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
yuzhaojing
2021-12-22 15:08:47 +08:00
committed by GitHub
parent f1286c2c76
commit 15eb7e81fc
3 changed files with 294 additions and 87 deletions

View File

@@ -102,6 +102,13 @@ public class FlinkCompactionConfig extends Configuration {
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false) + "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String compactionSeq = SEQ_LIFO; public String compactionSeq = SEQ_LIFO;
@Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
public Boolean serviceMode = false;
@Parameter(names = {"--min-compaction-interval-seconds"},
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;
/** /**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties * The latter is more suitable for the table APIs. It reads all the properties

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.sink.compact; package org.apache.hudi.sink.compact;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -25,12 +26,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -38,6 +42,10 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* Flink hudi compaction program that can be executed manually. * Flink hudi compaction program that can be executed manually.
*/ */
@@ -45,20 +53,115 @@ public class HoodieFlinkCompactor {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
/**
* Flink Execution Environment.
*/
private final AsyncCompactionService compactionScheduleService;
public HoodieFlinkCompactor(AsyncCompactionService service) {
this.compactionScheduleService = service;
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
AsyncCompactionService service = new AsyncCompactionService(cfg, conf, env);
new HoodieFlinkCompactor(service).start(cfg.serviceMode);
}
/**
* Main method to start compaction service.
*/
public void start(boolean serviceMode) throws Exception {
if (serviceMode) {
compactionScheduleService.start(null);
try {
compactionScheduleService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
} finally {
LOG.info("Shut down hoodie flink compactor");
}
} else {
LOG.info("Hoodie Flink Compactor running only single round");
try {
compactionScheduleService.compact();
} catch (Exception e) {
LOG.error("Got error running delta sync once. Shutting down", e);
throw e;
} finally {
LOG.info("Shut down hoodie flink compactor");
}
}
}
public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) {
FlinkCompactionConfig cfg = new FlinkCompactionConfig(); FlinkCompactionConfig cfg = new FlinkCompactionConfig();
JCommander cmd = new JCommander(cfg, null, args); JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) { if (cfg.help || args.length == 0) {
cmd.usage(); cmd.usage();
System.exit(1); System.exit(1);
} }
return cfg;
}
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); // -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Schedules compaction in service.
*/
public static class AsyncCompactionService extends HoodieAsyncService {
private static final long serialVersionUID = 1L;
/**
* Flink Compaction Config.
*/
private final FlinkCompactionConfig cfg;
/**
* Flink Config.
*/
private final Configuration conf;
/**
* Meta Client.
*/
private final HoodieTableMetaClient metaClient;
/**
* Write Client.
*/
private final HoodieFlinkWriteClient<?> writeClient;
/**
* The hoodie table.
*/
private final HoodieFlinkTable<?> table;
/**
* Flink Execution Environment.
*/
private final StreamExecutionEnvironment env;
/**
* Executor Service.
*/
private final ExecutorService executor;
public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
this.cfg = cfg;
this.conf = conf;
this.env = env;
this.executor = Executors.newFixedThreadPool(1);
// create metaClient // create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); this.metaClient = StreamerUtil.createMetaClient(conf);
// get the table name // get the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
@@ -69,11 +172,37 @@ public class HoodieFlinkCompactor {
// infer changelog mode // infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient); CompactionUtil.inferChangelogMode(conf, metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); this.writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable(); this.table = writeClient.getHoodieTable();
}
// judge whether have operation @Override
// to compute the compaction instant time and do compaction. protected Pair<CompletableFuture, ExecutorService> startService() {
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;
try {
while (!isShutdownRequested()) {
try {
compact();
Thread.sleep(cfg.minCompactionIntervalSeconds * 1000);
} catch (Exception e) {
LOG.error("Shutting down compaction service due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
}
} finally {
shutdownAsyncService(error);
}
return true;
}, executor), executor);
}
private void compact() throws Exception {
table.getMetaClient().reloadActiveTimeline();
// checks the compaction plan and do compaction.
if (cfg.schedule) { if (cfg.schedule) {
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) { if (compactionInstantTimeOption.isPresent()) {
@@ -136,8 +265,11 @@ public class HoodieFlinkCompactor {
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
LOG.info("Start to compaction for instant " + compactionInstantTime);
// Mark instant as compaction inflight // Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
.name("compaction_source") .name("compaction_source")
@@ -152,7 +284,21 @@ public class HoodieFlinkCompactor {
.uid("uid_clean_commits") .uid("uid_clean_commits")
.setParallelism(1); .setParallelism(1);
env.execute("flink_hudi_compaction"); env.execute("flink_hudi_compaction_" + compactionInstantTime);
}
/**
* Shutdown async services like compaction/clustering as DeltaSync is shutdown.
*/
public void shutdownAsyncService(boolean error) {
LOG.info("Gracefully shutting down compactor. Error ?" + error);
executor.shutdown();
writeClient.close(); writeClient.close();
} }
@VisibleForTesting
public void shutDown() {
shutdownAsyncService(false);
}
}
} }

View File

@@ -44,6 +44,8 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
@@ -58,13 +60,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}. * IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
*/ */
public class ITTestHoodieFlinkCompactor { public class ITTestHoodieFlinkCompactor {
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
private static final Map<String, List<String>> EXPECTED1 = new HashMap<>();
private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
static { static {
EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); EXPECTED1.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); EXPECTED1.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
EXPECTED2.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
} }
@TempDir @TempDir
@@ -148,6 +160,48 @@ public class ITTestHoodieFlinkCompactor {
env.execute("flink_hudi_compaction"); env.execute("flink_hudi_compaction");
writeClient.close(); writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED); TestData.checkWrittenFullData(tempFile, EXPECTED1);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(5);
// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minCompactionIntervalSeconds = 3;
cfg.schedule = true;
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
asyncCompactionService.start(null);
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(5);
asyncCompactionService.shutDown();
TestData.checkWrittenFullData(tempFile, EXPECTED2);
} }
} }