[HUDI-2568] Simplify the view storage config properties (#3815)
This commit is contained in:
@@ -35,14 +35,14 @@ public class TimeWait {
|
|||||||
private final long timeout; // timeout in SECONDS
|
private final long timeout; // timeout in SECONDS
|
||||||
private final long interval; // interval in MILLISECONDS
|
private final long interval; // interval in MILLISECONDS
|
||||||
private final String action; // action to report error message
|
private final String action; // action to report error message
|
||||||
private final boolean throwsE; // whether to throw when timeout
|
private final boolean throwsT; // whether to throw when timeout
|
||||||
private long waitingTime = 0L;
|
private long waitingTime = 0L;
|
||||||
|
|
||||||
private TimeWait(long timeout, long interval, String action, boolean throwsE) {
|
private TimeWait(long timeout, long interval, String action, boolean throwsT) {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
this.interval = interval;
|
this.interval = interval;
|
||||||
this.action = action;
|
this.action = action;
|
||||||
this.throwsE = throwsE;
|
this.throwsT = throwsT;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Builder builder() {
|
public static Builder builder() {
|
||||||
@@ -58,7 +58,7 @@ public class TimeWait {
|
|||||||
try {
|
try {
|
||||||
if (waitingTime > timeout) {
|
if (waitingTime > timeout) {
|
||||||
final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action;
|
final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action;
|
||||||
if (this.throwsE) {
|
if (this.throwsT) {
|
||||||
throw new HoodieException(msg);
|
throw new HoodieException(msg);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn(msg);
|
LOG.warn(msg);
|
||||||
|
|||||||
@@ -392,7 +392,13 @@ public class StreamerUtil {
|
|||||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
|
||||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
|
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
|
||||||
// create the filesystem view storage properties for client
|
// create the filesystem view storage properties for client
|
||||||
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), writeConfig.getViewStorageConfig());
|
FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
|
||||||
|
// rebuild the view storage config with simplified options.
|
||||||
|
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withStorageType(viewStorageConfig.getStorageType())
|
||||||
|
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
|
||||||
|
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
|
||||||
|
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
|
||||||
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user