[HUDI-3418] Save timeout option for remote RemoteFileSystemView (#4809)
Co-authored-by: yuzhaojing <yuzhaojing@bytedance.com>
This commit is contained in:
@@ -112,8 +112,12 @@ public class EmbeddedTimelineService {
|
|||||||
FileSystemViewStorageType viewStorageType = writeConfig.getClientSpecifiedViewStorageConfig()
|
FileSystemViewStorageType viewStorageType = writeConfig.getClientSpecifiedViewStorageConfig()
|
||||||
.shouldEnableBackupForRemoteFileSystemView()
|
.shouldEnableBackupForRemoteFileSystemView()
|
||||||
? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
|
? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
|
||||||
return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType)
|
return FileSystemViewStorageConfig.newBuilder()
|
||||||
.withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
|
.withStorageType(viewStorageType)
|
||||||
|
.withRemoteServerHost(hostAddr)
|
||||||
|
.withRemoteServerPort(serverPort)
|
||||||
|
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileSystemViewManager getViewManager() {
|
public FileSystemViewManager getViewManager() {
|
||||||
|
|||||||
@@ -240,7 +240,7 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withRemoteTimelineClientTimeoutSecs(Long timelineClientTimeoutSecs) {
|
public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeoutSecs) {
|
||||||
fileSystemViewStorageConfig.setValue(REMOTE_TIMEOUT_SECS, timelineClientTimeoutSecs.toString());
|
fileSystemViewStorageConfig.setValue(REMOTE_TIMEOUT_SECS, timelineClientTimeoutSecs.toString());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -406,7 +406,9 @@ public class StreamerUtil {
|
|||||||
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
|
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
|
||||||
.withStorageType(viewStorageConfig.getStorageType())
|
.withStorageType(viewStorageConfig.getStorageType())
|
||||||
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
|
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
|
||||||
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
|
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
|
||||||
|
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
|
||||||
|
.build();
|
||||||
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
|
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
|
||||||
return writeClient;
|
return writeClient;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -397,6 +397,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReuseEmbeddedServer() throws IOException {
|
public void testReuseEmbeddedServer() throws IOException {
|
||||||
|
conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500);
|
||||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
|
||||||
FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
|
FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
|
||||||
|
|
||||||
@@ -406,6 +407,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
|
|||||||
writeClient = StreamerUtil.createWriteClient(conf);
|
writeClient = StreamerUtil.createWriteClient(conf);
|
||||||
assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
|
assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
|
||||||
assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
|
assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
|
||||||
|
assertEquals(viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), 500);
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user