1
0

[HUDI-2227] Only sync hive meta on successful commit for flink batch writer (#3351)

This commit is contained in:
Danny Chan
2021-07-27 20:10:08 +08:00
committed by GitHub
parent 59ff8423f9
commit 60758b36ea
3 changed files with 24 additions and 20 deletions

View File

@@ -172,9 +172,9 @@ public class StreamWriteOperatorCoordinator
if (executor != null) {
executor.close();
}
// sync Hive if is enabled in batch mode.
syncHiveIfEnabled();
if (hiveSyncExecutor != null) {
hiveSyncExecutor.close();
}
this.eventBuffer = null;
}
@@ -258,7 +258,7 @@ public class StreamWriteOperatorCoordinator
// -------------------------------------------------------------------------
private void initHiveSync() {
this.hiveSyncExecutor = new NonThrownExecutor(LOG);
this.hiveSyncExecutor = new NonThrownExecutor(LOG, true);
this.hiveSyncContext = HiveSyncContext.create(conf);
}
@@ -342,7 +342,8 @@ public class StreamWriteOperatorCoordinator
if (allEventsReceived()) {
// start to commit the instant.
commitInstant(this.instant);
// no compaction scheduling for batch mode
// sync Hive if is enabled in batch mode.
syncHiveIfEnabled();
}
}

View File

@@ -23,8 +23,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* Coordinator executor that executes the tasks asynchronously, it fails the job
* for any task exceptions.
@@ -37,7 +35,7 @@ public class CoordinatorExecutor extends NonThrownExecutor {
private final OperatorCoordinator.Context context;
public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) {
super(logger);
super(logger, true);
this.context = context;
}
@@ -45,13 +43,4 @@ public class CoordinatorExecutor extends NonThrownExecutor {
protected void exceptionHook(String actionString, Throwable t) {
this.context.failJob(new HoodieException(actionString, t));
}
@Override
public void close() throws Exception {
// wait for the remaining tasks to finish.
executor.shutdown();
// We do not expect this to actually block for long. At this point, there should
// be very few task running in the executor, if any.
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}

View File

@@ -35,11 +35,21 @@ public class NonThrownExecutor implements AutoCloseable {
/**
* A single-thread executor to handle all the asynchronous jobs.
*/
protected final ExecutorService executor;
private final ExecutorService executor;
public NonThrownExecutor(Logger logger) {
/**
* Flag saying whether to wait for the tasks finish on #close.
*/
private final boolean waitForTaskFinishOnClose;
public NonThrownExecutor(Logger logger, boolean waitForTaskFinishOnClose) {
this.executor = Executors.newSingleThreadExecutor();
this.logger = logger;
this.waitForTaskFinishOnClose = waitForTaskFinishOnClose;
}
public NonThrownExecutor(Logger logger) {
this(logger, false);
}
/**
@@ -75,7 +85,11 @@ public class NonThrownExecutor implements AutoCloseable {
@Override
public void close() throws Exception {
if (executor != null) {
executor.shutdownNow();
if (waitForTaskFinishOnClose) {
executor.shutdown();
} else {
executor.shutdownNow();
}
// We do not expect this to actually block for long. At this point, there should
// be very few task running in the executor, if any.
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);