diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 195e430d0..ec53be8ca 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -19,6 +19,8 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -63,7 +65,13 @@ public class CleanFunction extends AbstractRichFunction // do not use the remote filesystem view because the async cleaning service // local timeline is very probably to fall behind with the remote one. this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); - this.executor = NonThrownExecutor.builder(LOG).build(); + this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); + + if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info(String.format("exec sync clean with instant time %s...", instantTime)); + executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish"); + } } } @@ -101,6 +109,10 @@ public class CleanFunction extends AbstractRichFunction @Override public void close() throws Exception { + if (executor != null) { + executor.close(); + } + if (this.writeClient != null) { this.writeClient.close(); }