diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactEventHandler.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactEventHandler.java new file mode 100644 index 000000000..04771c242 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactEventHandler.java @@ -0,0 +1,58 @@ +package org.apache.hudi.sink.compact; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.configuration.FlinkOptions; + +import java.io.Serializable; +import java.util.List; + +/** + * Do something when compaction finish + * + * @author ZhangJiacheng + * @date 2022-05-15 + */ +public interface CompactEventHandler extends Serializable { + /** + * This method will be called when compaction commit failure + * + * @param instant commit instant + */ + void failure(String instant); + /** + * This method will be called when compaction commit success + * + * @param instant commit instant + * @param statuses write status + * @param metadata commit data + */ + void success(String instant, List statuses, HoodieCommitMetadata metadata); + + /** + * This method will be called when compaction sink closed. + * + * @param message Any message want to say + * @param e Any exception caused by close + */ + void closed(String message, Exception e); + + static CompactEventHandler defaultHandler() { + return new CompactEventHandler() { + @Override + public void failure(String instant) { + } + + @Override + public void success(String instant, List statuses, HoodieCommitMetadata metadata) { + } + + @Override + public void closed(String message, Exception e) { + } + }; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 8dadd2e2d..81e115013 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.CleanFunction; @@ -62,6 +64,11 @@ public class CompactionCommitSink extends CleanFunction { */ private final Configuration conf; + /** + * Do something when sink closed + */ + private CompactEventHandler eventHandler; + /** * Buffer to collect the event from each compact task {@code CompactFunction}. * @@ -85,6 +92,14 @@ public class CompactionCommitSink extends CleanFunction { public CompactionCommitSink(Configuration conf) { super(conf); this.conf = conf; + this.eventHandler = CompactEventHandler.defaultHandler(); + } + + public CompactionCommitSink(Configuration conf, CompactEventHandler eventHandler) { + this(conf); + if (eventHandler != null) { + this.eventHandler = eventHandler; + } } @Override @@ -132,6 +147,7 @@ public class CompactionCommitSink extends CleanFunction { try { // handle failure case CompactionUtil.rollbackCompaction(table, instant); + eventHandler.failure(instant); } finally { // remove commitBuffer to avoid obsolete metadata commit reset(instant); @@ -167,10 +183,23 @@ public class CompactionCommitSink extends CleanFunction { if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient.clean(); } + + eventHandler.success(instant, statuses, metadata); } private void reset(String instant) { this.commitBuffer.remove(instant); this.compactionPlanCache.remove(instant); } + + @Override + public void close() throws Exception { + try { + super.close(); + eventHandler.closed("", null); + } catch (Exception e) { + eventHandler.closed(e.getMessage(), e); + throw e; + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index e2d2972a0..4728f717d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -158,6 +158,11 @@ public class HoodieFlinkCompactor { */ private final ExecutorService executor; + /** + * Closed handler + */ + private CompactEventHandler eventHandler; + public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { this.cfg = cfg; this.conf = conf; @@ -179,6 +184,14 @@ public class HoodieFlinkCompactor { this.writeClient = StreamerUtil.createWriteClient(conf); this.writeConfig = writeClient.getConfig(); this.table = writeClient.getHoodieTable(); + this.eventHandler = CompactEventHandler.defaultHandler(); + } + + public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env, CompactEventHandler eventHandler) throws Exception { + this(cfg, conf, env); + if (eventHandler != null) { + this.eventHandler = eventHandler; + } } @Override @@ -204,6 +217,10 @@ public class HoodieFlinkCompactor { }, executor), executor); } + private void compactClosed(String message, Exception e) { + eventHandler.closed(message, e); + } + private void compact() throws Exception { table.getMetaClient().reloadActiveTimeline(); @@ -215,6 +232,7 @@ public class HoodieFlinkCompactor { if (!scheduled) { // do nothing. LOG.info("No compaction plan for this job "); + compactClosed("No compaction plan for this job ", null); return; } table.getMetaClient().reloadActiveTimeline(); @@ -227,6 +245,7 @@ public class HoodieFlinkCompactor { if (requested.isEmpty()) { // do nothing. LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); + compactClosed("No compaction plan scheduled", null); return; } @@ -257,6 +276,7 @@ public class HoodieFlinkCompactor { if (compactionPlans.isEmpty()) { // No compaction plan, do nothing and return. LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes)); + compactClosed("No compaction plan for instant " + String.join(",", compactionInstantTimes), null); return; } @@ -270,6 +290,8 @@ public class HoodieFlinkCompactor { LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + "Clean the compaction plan in auxiliary path and cancels the compaction"); CompactionUtil.cleanInstant(table.getMetaClient(), instant); + compactClosed("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction", null); return; } } @@ -295,7 +317,7 @@ public class HoodieFlinkCompactor { TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(compactionParallelism) - .addSink(new CompactionCommitSink(conf)) + .addSink(new CompactionCommitSink(conf, eventHandler)) .name("compaction_commit") .uid("uid_compaction_commit") .setParallelism(1);