[HUDI-3233] Make metadata commit synchronous for flink batch
close apache/hudi#4561
This commit is contained in:
@@ -233,7 +233,7 @@ public class StreamWriteOperatorCoordinator
|
||||
// start new instant.
|
||||
startInstant();
|
||||
// sync Hive if is enabled
|
||||
syncHiveIfEnabled();
|
||||
syncHiveAsync();
|
||||
}
|
||||
}, "commits the instant %s", this.instant
|
||||
);
|
||||
@@ -246,21 +246,24 @@ public class StreamWriteOperatorCoordinator
|
||||
|
||||
@Override
|
||||
public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
|
||||
executor.execute(
|
||||
() -> {
|
||||
// no event to handle
|
||||
ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
|
||||
"The coordinator can only handle WriteMetaEvent");
|
||||
WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
|
||||
if (event.isBootstrap()) {
|
||||
handleBootstrapEvent(event);
|
||||
} else if (event.isEndInput()) {
|
||||
handleEndInputEvent(event);
|
||||
} else {
|
||||
handleWriteMetaEvent(event);
|
||||
}
|
||||
}, "handle write metadata event for instant %s", this.instant
|
||||
);
|
||||
ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
|
||||
"The coordinator can only handle WriteMetaEvent");
|
||||
WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
|
||||
|
||||
if (event.isEndInput()) {
|
||||
// handle end input event synchronously
|
||||
handleEndInputEvent(event);
|
||||
} else {
|
||||
executor.execute(
|
||||
() -> {
|
||||
if (event.isBootstrap()) {
|
||||
handleBootstrapEvent(event);
|
||||
} else {
|
||||
handleWriteMetaEvent(event);
|
||||
}
|
||||
}, "handle write metadata event for instant %s", this.instant
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -289,16 +292,23 @@ public class StreamWriteOperatorCoordinator
|
||||
this.hiveSyncContext = HiveSyncContext.create(conf);
|
||||
}
|
||||
|
||||
private void syncHiveIfEnabled() {
|
||||
private void syncHiveAsync() {
|
||||
if (tableState.syncHive) {
|
||||
this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
|
||||
this.hiveSyncExecutor.execute(this::doSyncHive, "sync hive metadata for instant %s", this.instant);
|
||||
}
|
||||
}
|
||||
|
||||
private void syncHive() {
|
||||
if (tableState.syncHive) {
|
||||
doSyncHive();
|
||||
LOG.info("Sync hive metadata for instant {} success!", this.instant);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync hoodie table metadata to Hive metastore.
|
||||
*/
|
||||
public void syncHive() {
|
||||
public void doSyncHive() {
|
||||
hiveSyncContext.hiveSyncTool().syncHoodieTable();
|
||||
}
|
||||
|
||||
@@ -380,8 +390,8 @@ public class StreamWriteOperatorCoordinator
|
||||
// The executor thread inherits the classloader of the #handleEventFromOperator
|
||||
// caller, which is a AppClassLoader.
|
||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||
// sync Hive if is enabled in batch mode.
|
||||
syncHiveIfEnabled();
|
||||
// sync Hive synchronously if it is enabled in batch mode.
|
||||
syncHive();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user