diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 2236415a7..e032c6f59 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -172,9 +172,9 @@ public class StreamWriteOperatorCoordinator if (executor != null) { executor.close(); } - // sync Hive if is enabled in batch mode. - syncHiveIfEnabled(); - + if (hiveSyncExecutor != null) { + hiveSyncExecutor.close(); + } this.eventBuffer = null; } @@ -258,7 +258,7 @@ public class StreamWriteOperatorCoordinator // ------------------------------------------------------------------------- private void initHiveSync() { - this.hiveSyncExecutor = new NonThrownExecutor(LOG); + this.hiveSyncExecutor = new NonThrownExecutor(LOG, true); this.hiveSyncContext = HiveSyncContext.create(conf); } @@ -342,7 +342,8 @@ public class StreamWriteOperatorCoordinator if (allEventsReceived()) { // start to commit the instant. commitInstant(this.instant); - // no compaction scheduling for batch mode + // sync Hive if is enabled in batch mode. + syncHiveIfEnabled(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java index b5957679b..761d03d58 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java @@ -23,8 +23,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.slf4j.Logger; -import java.util.concurrent.TimeUnit; - /** * Coordinator executor that executes the tasks asynchronously, it fails the job * for any task exceptions. @@ -37,7 +35,7 @@ public class CoordinatorExecutor extends NonThrownExecutor { private final OperatorCoordinator.Context context; public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) { - super(logger); + super(logger, true); this.context = context; } @@ -45,13 +43,4 @@ public class CoordinatorExecutor extends NonThrownExecutor { protected void exceptionHook(String actionString, Throwable t) { this.context.failJob(new HoodieException(actionString, t)); } - - @Override - public void close() throws Exception { - // wait for the remaining tasks to finish. - executor.shutdown(); - // We do not expect this to actually block for long. At this point, there should - // be very few task running in the executor, if any. - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 87c9c0179..446cb854c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -35,11 +35,21 @@ public class NonThrownExecutor implements AutoCloseable { /** * A single-thread executor to handle all the asynchronous jobs. */ - protected final ExecutorService executor; + private final ExecutorService executor; - public NonThrownExecutor(Logger logger) { + /** + * Flag saying whether to wait for the tasks finish on #close. + */ + private final boolean waitForTaskFinishOnClose; + + public NonThrownExecutor(Logger logger, boolean waitForTaskFinishOnClose) { this.executor = Executors.newSingleThreadExecutor(); this.logger = logger; + this.waitForTaskFinishOnClose = waitForTaskFinishOnClose; + } + + public NonThrownExecutor(Logger logger) { + this(logger, false); } /** @@ -75,7 +85,11 @@ public class NonThrownExecutor implements AutoCloseable { @Override public void close() throws Exception { if (executor != null) { - executor.shutdownNow(); + if (waitForTaskFinishOnClose) { + executor.shutdown(); + } else { + executor.shutdownNow(); + } // We do not expect this to actually block for long. At this point, there should // be very few task running in the executor, if any. executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);