From 3025f4d7961b221743fe3cc248e0bb2e8ee6411b Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 18 Oct 2021 14:42:33 +0800 Subject: [PATCH] [HUDI-2568] Simplify the view storage config properties (#3815) --- .../main/java/org/apache/hudi/sink/utils/TimeWait.java | 8 ++++---- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 8 +++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java index 005b084bf..453c2314d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java @@ -35,14 +35,14 @@ public class TimeWait { private final long timeout; // timeout in SECONDS private final long interval; // interval in MILLISECONDS private final String action; // action to report error message - private final boolean throwsE; // whether to throw when timeout + private final boolean throwsT; // whether to throw when timeout private long waitingTime = 0L; - private TimeWait(long timeout, long interval, String action, boolean throwsE) { + private TimeWait(long timeout, long interval, String action, boolean throwsT) { this.timeout = timeout; this.interval = interval; this.action = action; - this.throwsE = throwsE; + this.throwsT = throwsT; } public static Builder builder() { @@ -58,7 +58,7 @@ public class TimeWait { try { if (waitingTime > timeout) { final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action; - if (this.throwsE) { + if (this.throwsT) { throw new HoodieException(msg); } else { LOG.warn(msg); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b77415a39..835bb49b4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -392,7 +392,13 @@ public class StreamerUtil { public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); // create the filesystem view storage properties for client - ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), writeConfig.getViewStorageConfig()); + FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + // rebuild the view storage config with simplified options. + FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() + .withStorageType(viewStorageConfig.getStorageType()) + .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) + .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); }