Add closed handler to HoodieFlinkCompactor
This commit is contained in:
@@ -0,0 +1,58 @@
|
||||
package org.apache.hudi.sink.compact;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Do something when compaction finish
|
||||
*
|
||||
* @author ZhangJiacheng
|
||||
* @date 2022-05-15
|
||||
*/
|
||||
public interface CompactEventHandler extends Serializable {
|
||||
/**
|
||||
* This method will be called when compaction commit failure
|
||||
*
|
||||
* @param instant commit instant
|
||||
*/
|
||||
void failure(String instant);
|
||||
/**
|
||||
* This method will be called when compaction commit success
|
||||
*
|
||||
* @param instant commit instant
|
||||
* @param statuses write status
|
||||
* @param metadata commit data
|
||||
*/
|
||||
void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata);
|
||||
|
||||
/**
|
||||
* This method will be called when compaction sink closed.
|
||||
*
|
||||
* @param message Any message want to say
|
||||
* @param e Any exception caused by close
|
||||
*/
|
||||
void closed(String message, Exception e);
|
||||
|
||||
static CompactEventHandler defaultHandler() {
|
||||
return new CompactEventHandler() {
|
||||
@Override
|
||||
public void failure(String instant) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closed(String message, Exception e) {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,8 @@ import org.apache.hudi.common.data.HoodieListData;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.sink.CleanFunction;
|
||||
@@ -62,6 +64,11 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
*/
|
||||
private final Configuration conf;
|
||||
|
||||
/**
|
||||
* Do something when sink closed
|
||||
*/
|
||||
private CompactEventHandler eventHandler;
|
||||
|
||||
/**
|
||||
* Buffer to collect the event from each compact task {@code CompactFunction}.
|
||||
*
|
||||
@@ -85,6 +92,14 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
public CompactionCommitSink(Configuration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
this.eventHandler = CompactEventHandler.defaultHandler();
|
||||
}
|
||||
|
||||
public CompactionCommitSink(Configuration conf, CompactEventHandler eventHandler) {
|
||||
this(conf);
|
||||
if (eventHandler != null) {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -132,6 +147,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
try {
|
||||
// handle failure case
|
||||
CompactionUtil.rollbackCompaction(table, instant);
|
||||
eventHandler.failure(instant);
|
||||
} finally {
|
||||
// remove commitBuffer to avoid obsolete metadata commit
|
||||
reset(instant);
|
||||
@@ -167,10 +183,23 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
||||
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||
this.writeClient.clean();
|
||||
}
|
||||
|
||||
eventHandler.success(instant, statuses, metadata);
|
||||
}
|
||||
|
||||
private void reset(String instant) {
|
||||
this.commitBuffer.remove(instant);
|
||||
this.compactionPlanCache.remove(instant);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
try {
|
||||
super.close();
|
||||
eventHandler.closed("", null);
|
||||
} catch (Exception e) {
|
||||
eventHandler.closed(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,6 +158,11 @@ public class HoodieFlinkCompactor {
|
||||
*/
|
||||
private final ExecutorService executor;
|
||||
|
||||
/**
|
||||
* Closed handler
|
||||
*/
|
||||
private CompactEventHandler eventHandler;
|
||||
|
||||
public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
|
||||
this.cfg = cfg;
|
||||
this.conf = conf;
|
||||
@@ -179,6 +184,14 @@ public class HoodieFlinkCompactor {
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf);
|
||||
this.writeConfig = writeClient.getConfig();
|
||||
this.table = writeClient.getHoodieTable();
|
||||
this.eventHandler = CompactEventHandler.defaultHandler();
|
||||
}
|
||||
|
||||
public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, StreamExecutionEnvironment env, CompactEventHandler eventHandler) throws Exception {
|
||||
this(cfg, conf, env);
|
||||
if (eventHandler != null) {
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -204,6 +217,10 @@ public class HoodieFlinkCompactor {
|
||||
}, executor), executor);
|
||||
}
|
||||
|
||||
private void compactClosed(String message, Exception e) {
|
||||
eventHandler.closed(message, e);
|
||||
}
|
||||
|
||||
private void compact() throws Exception {
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
|
||||
@@ -215,6 +232,7 @@ public class HoodieFlinkCompactor {
|
||||
if (!scheduled) {
|
||||
// do nothing.
|
||||
LOG.info("No compaction plan for this job ");
|
||||
compactClosed("No compaction plan for this job ", null);
|
||||
return;
|
||||
}
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
@@ -227,6 +245,7 @@ public class HoodieFlinkCompactor {
|
||||
if (requested.isEmpty()) {
|
||||
// do nothing.
|
||||
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
|
||||
compactClosed("No compaction plan scheduled", null);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -257,6 +276,7 @@ public class HoodieFlinkCompactor {
|
||||
if (compactionPlans.isEmpty()) {
|
||||
// No compaction plan, do nothing and return.
|
||||
LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
|
||||
compactClosed("No compaction plan for instant " + String.join(",", compactionInstantTimes), null);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -270,6 +290,8 @@ public class HoodieFlinkCompactor {
|
||||
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
||||
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
|
||||
compactClosed("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction", null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -295,7 +317,7 @@ public class HoodieFlinkCompactor {
|
||||
TypeInformation.of(CompactionCommitEvent.class),
|
||||
new ProcessOperator<>(new CompactFunction(conf)))
|
||||
.setParallelism(compactionParallelism)
|
||||
.addSink(new CompactionCommitSink(conf))
|
||||
.addSink(new CompactionCommitSink(conf, eventHandler))
|
||||
.name("compaction_commit")
|
||||
.uid("uid_compaction_commit")
|
||||
.setParallelism(1);
|
||||
|
||||
Reference in New Issue
Block a user