diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java index 005b084bf..453c2314d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java @@ -35,14 +35,14 @@ public class TimeWait { private final long timeout; // timeout in SECONDS private final long interval; // interval in MILLISECONDS private final String action; // action to report error message - private final boolean throwsE; // whether to throw when timeout + private final boolean throwsT; // whether to throw when timeout private long waitingTime = 0L; - private TimeWait(long timeout, long interval, String action, boolean throwsE) { + private TimeWait(long timeout, long interval, String action, boolean throwsT) { this.timeout = timeout; this.interval = interval; this.action = action; - this.throwsE = throwsE; + this.throwsT = throwsT; } public static Builder builder() { @@ -58,7 +58,7 @@ public class TimeWait { try { if (waitingTime > timeout) { final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action; - if (this.throwsE) { + if (this.throwsT) { throw new HoodieException(msg); } else { LOG.warn(msg); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b77415a39..835bb49b4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -392,7 +392,13 @@ public class StreamerUtil { public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); // create the filesystem view storage properties for client - ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), writeConfig.getViewStorageConfig()); + FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + // rebuild the view storage config with simplified options. + FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() + .withStorageType(viewStorageConfig.getStorageType()) + .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) + .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); }