[HUDI-3037] Add back remote view storage config for flink (#4338)
This commit is contained in:
@@ -239,6 +239,7 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
|
|||||||
public void reset() {
|
public void reset() {
|
||||||
preferredView.reset();
|
preferredView.reset();
|
||||||
secondaryView.reset();
|
secondaryView.reset();
|
||||||
|
errorOnPreferredView = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -255,6 +256,7 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
|
|||||||
public void sync() {
|
public void sync() {
|
||||||
preferredView.sync();
|
preferredView.sync();
|
||||||
secondaryView.sync();
|
secondaryView.sync();
|
||||||
|
errorOnPreferredView = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -21,7 +21,8 @@ package org.apache.hudi.common.util;
|
|||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility class for network.
|
* A utility class for network.
|
||||||
@@ -29,10 +30,13 @@ import java.net.ServerSocket;
|
|||||||
public class NetworkUtils {
|
public class NetworkUtils {
|
||||||
|
|
||||||
public static synchronized String getHostname() {
|
public static synchronized String getHostname() {
|
||||||
ServerSocket s = null;
|
Socket s = null;
|
||||||
try {
|
try {
|
||||||
s = new ServerSocket(0);
|
s = new Socket();
|
||||||
return s.getInetAddress().getHostAddress();
|
// see https://stackoverflow.com/questions/9481865/getting-the-ip-address-of-the-current-machine-using-java
|
||||||
|
// for details.
|
||||||
|
s.connect(new InetSocketAddress("google.com", 80));
|
||||||
|
return s.getLocalAddress().getHostAddress();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieException("Unable to find server port", e);
|
throw new HoodieException("Unable to find server port", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ public class CleanFunction<T> extends AbstractRichFunction
|
|||||||
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||||
// do not use the remote filesystem view because the async cleaning service
|
// do not use the remote filesystem view because the async cleaning service
|
||||||
// local timeline is very probably to fall behind with the remote one.
|
// local timeline is very probably to fall behind with the remote one.
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false);
|
||||||
this.executor = NonThrownExecutor.builder(LOG).build();
|
this.executor = NonThrownExecutor.builder(LOG).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
||||||
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
||||||
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
|
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
|
||||||
|
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
||||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ public class FlinkTables {
|
|||||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||||
new SerializableConfiguration(getHadoopConf()),
|
new SerializableConfiguration(getHadoopConf()),
|
||||||
new FlinkTaskContextSupplier(runtimeContext));
|
new FlinkTaskContextSupplier(runtimeContext));
|
||||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
|
||||||
return HoodieFlinkTable.create(writeConfig, context);
|
return HoodieFlinkTable.create(writeConfig, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,7 +71,7 @@ public class FlinkTables {
|
|||||||
* <p>This expects to be used by driver.
|
* <p>This expects to be used by driver.
|
||||||
*/
|
*/
|
||||||
public static HoodieFlinkTable<?> createTable(Configuration conf) {
|
public static HoodieFlinkTable<?> createTable(Configuration conf) {
|
||||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf);
|
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
|
||||||
return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
|
return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
@@ -144,7 +145,21 @@ public class StreamerUtil {
|
|||||||
return FlinkClientUtil.getHadoopConf();
|
return FlinkClientUtil.getHadoopConf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mainly used for tests.
|
||||||
|
*/
|
||||||
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
|
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
|
||||||
|
return getHoodieClientConfig(conf, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) {
|
||||||
|
return getHoodieClientConfig(conf, false, loadFsViewStorageConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieWriteConfig getHoodieClientConfig(
|
||||||
|
Configuration conf,
|
||||||
|
boolean enableEmbeddedTimelineService,
|
||||||
|
boolean loadFsViewStorageConfig) {
|
||||||
HoodieWriteConfig.Builder builder =
|
HoodieWriteConfig.Builder builder =
|
||||||
HoodieWriteConfig.newBuilder()
|
HoodieWriteConfig.newBuilder()
|
||||||
.withEngineType(EngineType.FLINK)
|
.withEngineType(EngineType.FLINK)
|
||||||
@@ -189,13 +204,20 @@ public class StreamerUtil {
|
|||||||
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||||
.build())
|
.build())
|
||||||
|
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
|
||||||
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
||||||
.withProps(flinkConf2TypedProperties(conf))
|
.withProps(flinkConf2TypedProperties(conf))
|
||||||
.withSchema(getSourceSchema(conf).toString());
|
.withSchema(getSourceSchema(conf).toString());
|
||||||
|
|
||||||
return builder.build();
|
HoodieWriteConfig writeConfig = builder.build();
|
||||||
|
if (loadFsViewStorageConfig) {
|
||||||
|
// do not use the builder to give a change for recovering the original fs view storage config
|
||||||
|
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
|
||||||
|
writeConfig.setViewStorageConfig(viewStorageConfig);
|
||||||
|
}
|
||||||
|
return writeConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -341,15 +363,28 @@ public class StreamerUtil {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the Flink write client.
|
* Creates the Flink write client.
|
||||||
|
*
|
||||||
|
* <p>This expects to be used by client, the driver should start an embedded timeline server.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
|
||||||
|
return createWriteClient(conf, runtimeContext, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the Flink write client.
|
||||||
|
*
|
||||||
|
* <p>This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use
|
||||||
|
* remote filesystem view storage config, or an in-memory filesystem view storage is used.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
|
||||||
HoodieFlinkEngineContext context =
|
HoodieFlinkEngineContext context =
|
||||||
new HoodieFlinkEngineContext(
|
new HoodieFlinkEngineContext(
|
||||||
new SerializableConfiguration(getHadoopConf()),
|
new SerializableConfiguration(getHadoopConf()),
|
||||||
new FlinkTaskContextSupplier(runtimeContext));
|
new FlinkTaskContextSupplier(runtimeContext));
|
||||||
|
|
||||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
|
||||||
return new HoodieFlinkWriteClient<>(context, writeConfig);
|
return new HoodieFlinkWriteClient<>(context, writeConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -362,9 +397,18 @@ public class StreamerUtil {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
|
||||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
|
||||||
// build the write client to start the embedded timeline server
|
// build the write client to start the embedded timeline server
|
||||||
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
||||||
|
// create the filesystem view storage properties for client
|
||||||
|
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
|
||||||
|
// rebuild the view storage config with simplified options.
|
||||||
|
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withStorageType(viewStorageConfig.getStorageType())
|
||||||
|
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
|
||||||
|
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
|
||||||
|
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
|
||||||
|
return writeClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,83 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.util;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to read/write {@link FileSystemViewStorageConfig}.
|
||||||
|
*/
|
||||||
|
public class ViewStorageProperties {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class);
|
||||||
|
|
||||||
|
private static final String FILE_NAME = "view_storage_conf.properties";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the {@link #FILE_NAME} meta file.
|
||||||
|
*/
|
||||||
|
public static void createProperties(
|
||||||
|
String basePath,
|
||||||
|
FileSystemViewStorageConfig config) throws IOException {
|
||||||
|
Path propertyPath = getPropertiesFilePath(basePath);
|
||||||
|
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
|
||||||
|
fs.delete(propertyPath, false);
|
||||||
|
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
|
||||||
|
config.getProps().store(outputStream,
|
||||||
|
"Filesystem view storage properties saved on " + new Date(System.currentTimeMillis()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the {@link FileSystemViewStorageConfig} with given table base path.
|
||||||
|
*/
|
||||||
|
public static FileSystemViewStorageConfig loadFromProperties(String basePath) {
|
||||||
|
Path propertyPath = getPropertiesFilePath(basePath);
|
||||||
|
LOG.info("Loading filesystem view storage properties from " + propertyPath);
|
||||||
|
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
|
||||||
|
Properties props = new Properties();
|
||||||
|
try {
|
||||||
|
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
|
||||||
|
props.load(inputStream);
|
||||||
|
}
|
||||||
|
return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Path getPropertiesFilePath(String basePath) {
|
||||||
|
String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
|
||||||
|
return new Path(auxPath, FILE_NAME);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,9 +19,12 @@
|
|||||||
package org.apache.hudi.utils;
|
package org.apache.hudi.utils;
|
||||||
|
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||||
import org.apache.hudi.common.util.FileIOUtils;
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
import org.apache.hudi.util.ViewStorageProperties;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@@ -98,5 +101,13 @@ public class TestStreamerUtil {
|
|||||||
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
|
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
|
||||||
assertThat(diff, is(75L));
|
assertThat(diff, is(75L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDumpRemoteViewStorageConfig() throws IOException {
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
|
StreamerUtil.createWriteClient(conf);
|
||||||
|
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
|
||||||
|
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,57 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.utils;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||||
|
import org.apache.hudi.util.ViewStorageProperties;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cases for {@link ViewStorageProperties}.
|
||||||
|
*/
|
||||||
|
public class TestViewStorageProperties {
|
||||||
|
@TempDir
|
||||||
|
File tempFile;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReadWriteProperties() throws IOException {
|
||||||
|
String basePath = tempFile.getAbsolutePath();
|
||||||
|
FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
|
||||||
|
.withRemoteServerHost("host1")
|
||||||
|
.withRemoteServerPort(1234).build();
|
||||||
|
ViewStorageProperties.createProperties(basePath, config);
|
||||||
|
ViewStorageProperties.createProperties(basePath, config);
|
||||||
|
ViewStorageProperties.createProperties(basePath, config);
|
||||||
|
|
||||||
|
FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath);
|
||||||
|
assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
|
||||||
|
assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
|
||||||
|
assertThat(readConfig.getRemoteViewServerPort(), is(1234));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user