From 2969fb3835b96dbd31fdbca536e8f370abb264c6 Mon Sep 17 00:00:00 2001 From: todd5167 <313328862@qq.com> Date: Wed, 12 Jan 2022 13:34:09 +0800 Subject: [PATCH] [HUDI-3233] Make metadata commit synchronous for flink batch close apache/hudi#4561 --- .../sink/StreamWriteOperatorCoordinator.java | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index d72eb1206..447cfa420 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -233,7 +233,7 @@ public class StreamWriteOperatorCoordinator // start new instant. startInstant(); // sync Hive if is enabled - syncHiveIfEnabled(); + syncHiveAsync(); } }, "commits the instant %s", this.instant ); @@ -246,21 +246,24 @@ public class StreamWriteOperatorCoordinator @Override public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { - executor.execute( - () -> { - // no event to handle - ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, - "The coordinator can only handle WriteMetaEvent"); - WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent; - if (event.isBootstrap()) { - handleBootstrapEvent(event); - } else if (event.isEndInput()) { - handleEndInputEvent(event); - } else { - handleWriteMetaEvent(event); - } - }, "handle write metadata event for instant %s", this.instant - ); + ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, + "The coordinator can only handle WriteMetaEvent"); + WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent; + + if (event.isEndInput()) { + // handle end input event synchronously + handleEndInputEvent(event); + } else { + executor.execute( + () -> { + if (event.isBootstrap()) { + handleBootstrapEvent(event); + } else { + handleWriteMetaEvent(event); + } + }, "handle write metadata event for instant %s", this.instant + ); + } } @Override @@ -289,16 +292,23 @@ public class StreamWriteOperatorCoordinator this.hiveSyncContext = HiveSyncContext.create(conf); } - private void syncHiveIfEnabled() { + private void syncHiveAsync() { if (tableState.syncHive) { - this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); + this.hiveSyncExecutor.execute(this::doSyncHive, "sync hive metadata for instant %s", this.instant); + } + } + + private void syncHive() { + if (tableState.syncHive) { + doSyncHive(); + LOG.info("Sync hive metadata for instant {} success!", this.instant); } } /** * Sync hoodie table metadata to Hive metastore. */ - public void syncHive() { + public void doSyncHive() { hiveSyncContext.hiveSyncTool().syncHoodieTable(); } @@ -380,8 +390,8 @@ public class StreamWriteOperatorCoordinator // The executor thread inherits the classloader of the #handleEventFromOperator // caller, which is a AppClassLoader. Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - // sync Hive if is enabled in batch mode. - syncHiveIfEnabled(); + // sync Hive synchronously if it is enabled in batch mode. + syncHive(); } }