diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index d8f96dbe3..4f3faadb9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -102,6 +102,13 @@ public class FlinkCompactionConfig extends Configuration { + "2). LIFO: execute the latest plan first, by default LIFO", required = false) public String compactionSeq = SEQ_LIFO; + @Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default") + public Boolean serviceMode = false; + + @Parameter(names = {"--min-compaction-interval-seconds"}, + description = "Min compaction interval of async compaction service, default 10 minutes") + public Integer minCompactionIntervalSeconds = 600; + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index a22bea9f3..a6161f2c8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -25,12 +26,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -38,6 +42,10 @@ import org.apache.flink.streaming.api.operators.ProcessOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Flink hudi compaction program that can be executed manually. */ @@ -45,114 +53,252 @@ public class HoodieFlinkCompactor { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + /** + * Flink Execution Environment. + */ + private final AsyncCompactionService compactionScheduleService; + + public HoodieFlinkCompactor(AsyncCompactionService service) { + this.compactionScheduleService = service; + } + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = getFlinkCompactionConfig(args); + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + + AsyncCompactionService service = new AsyncCompactionService(cfg, conf, env); + + new HoodieFlinkCompactor(service).start(cfg.serviceMode); + } + + /** + * Main method to start compaction service. + */ + public void start(boolean serviceMode) throws Exception { + if (serviceMode) { + compactionScheduleService.start(null); + try { + compactionScheduleService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } finally { + LOG.info("Shut down hoodie flink compactor"); + } + } else { + LOG.info("Hoodie Flink Compactor running only single round"); + try { + compactionScheduleService.compact(); + } catch (Exception e) { + LOG.error("Got error running delta sync once. Shutting down", e); + throw e; + } finally { + LOG.info("Shut down hoodie flink compactor"); + } + } + } + + public static FlinkCompactionConfig getFlinkCompactionConfig(String[] args) { FlinkCompactionConfig cfg = new FlinkCompactionConfig(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + return cfg; + } - Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- - // create metaClient - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + /** + * Schedules compaction in service. + */ + public static class AsyncCompactionService extends HoodieAsyncService { + private static final long serialVersionUID = 1L; - // get the table name - conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + /** + * Flink Compaction Config. + */ + private final FlinkCompactionConfig cfg; - // set table schema - CompactionUtil.setAvroSchema(conf, metaClient); + /** + * Flink Config. + */ + private final Configuration conf; - // infer changelog mode - CompactionUtil.inferChangelogMode(conf, metaClient); + /** + * Meta Client. + */ + private final HoodieTableMetaClient metaClient; - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - HoodieFlinkTable table = writeClient.getHoodieTable(); + /** + * Write Client. + */ + private final HoodieFlinkWriteClient writeClient; - // judge whether have operation - // to compute the compaction instant time and do compaction. - if (cfg.schedule) { - Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); - if (compactionInstantTimeOption.isPresent()) { - boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); - if (!scheduled) { - // do nothing. - LOG.info("No compaction plan for this job "); - return; + /** + * The hoodie table. + */ + private final HoodieFlinkTable table; + + /** + * Flink Execution Environment. + */ + private final StreamExecutionEnvironment env; + + /** + * Executor Service. + */ + private final ExecutorService executor; + + public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + this.cfg = cfg; + this.conf = conf; + this.env = env; + this.executor = Executors.newFixedThreadPool(1); + + // create metaClient + this.metaClient = StreamerUtil.createMetaClient(conf); + + // get the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // infer changelog mode + CompactionUtil.inferChangelogMode(conf, metaClient); + + this.writeClient = StreamerUtil.createWriteClient(conf); + this.table = writeClient.getHoodieTable(); + } + + @Override + protected Pair startService() { + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + + try { + while (!isShutdownRequested()) { + try { + compact(); + Thread.sleep(cfg.minCompactionIntervalSeconds * 1000); + } catch (Exception e) { + LOG.error("Shutting down compaction service due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownAsyncService(error); } + return true; + }, executor), executor); + } + + private void compact() throws Exception { + table.getMetaClient().reloadActiveTimeline(); + + // checks the compaction plan and do compaction. + if (cfg.schedule) { + Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTimeOption.isPresent()) { + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No compaction plan for this job "); + return; + } + table.getMetaClient().reloadActiveTimeline(); + } + } + + // fetch the instant based on the configured execution sequence + HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); + if (!requested.isPresent()) { + // do nothing. + LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); + return; + } + + String compactionInstantTime = requested.get().getTimestamp(); + + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); + table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); } - } - // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); - if (!requested.isPresent()) { - // do nothing. - LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); - return; - } + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); - String compactionInstantTime = requested.get().getTimestamp(); + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + // No compaction plan, do nothing and return. + LOG.info("No compaction plan for instant " + compactionInstantTime); + return; + } - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); - if (timeline.containsInstant(inflightInstant)) { - LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); - table.rollbackInflightCompaction(inflightInstant); + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + if (!pendingCompactionTimeline.containsInstant(instant)) { + // this means that the compaction plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + + // clean the compaction plan in auxiliary path and cancels the compaction. + + LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } + + // get compactionParallelism. + int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 + ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + + LOG.info("Start to compaction for instant " + compactionInstantTime); + + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); table.getMetaClient().reloadActiveTimeline(); + + env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) + .name("compaction_source") + .uid("uid_compaction_source") + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(compactionParallelism) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1); + + env.execute("flink_hudi_compaction_" + compactionInstantTime); } - // generate compaction plan - // should support configurable commit metadata - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - table.getMetaClient(), compactionInstantTime); - - if (compactionPlan == null || (compactionPlan.getOperations() == null) - || (compactionPlan.getOperations().isEmpty())) { - // No compaction plan, do nothing and return. - LOG.info("No compaction plan for instant " + compactionInstantTime); - return; + /** + * Shutdown async services like compaction/clustering as DeltaSync is shutdown. + */ + public void shutdownAsyncService(boolean error) { + LOG.info("Gracefully shutting down compactor. Error ?" + error); + executor.shutdown(); + writeClient.close(); } - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (!pendingCompactionTimeline.containsInstant(instant)) { - // this means that the compaction plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. - - // clean the compaction plan in auxiliary path and cancels the compaction. - - LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the compaction plan in auxiliary path and cancels the compaction"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; + @VisibleForTesting + public void shutDown() { + shutdownAsyncService(false); } - - // get compactionParallelism. - int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 - ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); - - // Mark instant as compaction inflight - table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); - - env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) - .name("compaction_source") - .uid("uid_compaction_source") - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(compactionParallelism) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits") - .uid("uid_clean_commits") - .setParallelism(1); - - env.execute("flink_hudi_compaction"); - writeClient.close(); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index e0c574bd6..ebe9140ad 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -44,6 +44,8 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.Arrays; @@ -58,13 +60,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue; * IT cases for {@link org.apache.hudi.common.model.HoodieRecord}. */ public class ITTestHoodieFlinkCompactor { - private static final Map> EXPECTED = new HashMap<>(); + + protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class); + + private static final Map> EXPECTED1 = new HashMap<>(); + + private static final Map> EXPECTED2 = new HashMap<>(); static { - EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); - EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); - EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); - EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); + EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); + EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); + EXPECTED1.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); + EXPECTED1.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); + + EXPECTED2.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1")); + EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2")); + EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3")); + EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4")); } @TempDir @@ -148,6 +160,48 @@ public class ITTestHoodieFlinkCompactor { env.execute("flink_hudi_compaction"); writeClient.close(); - TestData.checkWrittenFullData(tempFile, EXPECTED); + TestData.checkWrittenFullData(tempFile, EXPECTED1); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + ""); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(5); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + cfg.path = tempFile.getAbsolutePath(); + cfg.minCompactionIntervalSeconds = 3; + cfg.schedule = true; + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + + HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env); + asyncCompactionService.start(null); + + tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(5); + + asyncCompactionService.shutDown(); + + TestData.checkWrittenFullData(tempFile, EXPECTED2); } }