1
0

[HUDI-3336][HUDI-FLINK]Support custom hadoop config for flink (#5528)

* [HUDI-3336][HUDI-FLINK]Support custom hadoop config for flink
This commit is contained in:
Bo Cui
2022-05-13 09:50:11 +08:00
committed by GitHub
parent 0cec955fa2
commit 701f8c039d
26 changed files with 126 additions and 87 deletions

View File

@@ -709,25 +709,17 @@ public class FlinkOptions extends HoodieConfig {
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}
/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();
if (hasPropertyOptions(options)) {
if (hasPropertyOptions(options, prefix)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
final String subKey = key.substring(prefix.length());
hoodieProperties.put(subKey, value);
});
}
@@ -749,8 +741,8 @@ public class FlinkOptions extends HoodieConfig {
return fromMap(propsMap);
}
private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
private static boolean hasPropertyOptions(Map<String, String> options, String prefix) {
return options.keySet().stream().anyMatch(k -> k.startsWith(prefix));
}
/**

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.configuration;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.util.FlinkClientUtil;
import java.util.Map;
public class HadoopConfigurations {
private static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";
public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), PARQUET_PREFIX);
parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v));
return copy;
}
/**
* Create a new hadoop configuration that is initialized with the given flink configuration.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
options.forEach((k, v) -> hadoopConf.set(k, v));
return hadoopConf;
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.StreamerUtil;
@@ -49,9 +50,10 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private Schema targetSchema;
@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), HadoopConfigurations.getHadoopConf(new Configuration()));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
@@ -65,7 +67,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
public FilebasedSchemaProvider(Configuration conf) {
final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf());
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, HadoopConfigurations.getHadoopConf(conf));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
} catch (IOException ioe) {

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
@@ -122,7 +123,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
}
}
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());

View File

@@ -113,7 +113,7 @@ public class BulkInsertWriteFunction<I>
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH));
this.ckpMetadata = CkpMetadata.getInstance(config);
this.initInstant = lastPendingInstant();
sendBootstrapEvent();
initWriterHelper();

View File

@@ -18,11 +18,13 @@
package org.apache.hudi.sink.meta;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -70,8 +72,8 @@ public class CkpMetadata implements Serializable {
private List<CkpMessage> messages;
private List<String> instantCache;
private CkpMetadata(String basePath) {
this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath);
private CkpMetadata(Configuration config) {
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
}
private CkpMetadata(FileSystem fs, String basePath) {
@@ -196,8 +198,8 @@ public class CkpMetadata implements Serializable {
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
public static CkpMetadata getInstance(String basePath) {
return new CkpMetadata(basePath);
public static CkpMetadata getInstance(Configuration config) {
return new CkpMetadata(config);
}
public static CkpMetadata getInstance(FileSystem fs, String basePath) {

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
@@ -116,7 +117,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new SerializableConfiguration(HadoopConfigurations.getHadoopConf(this.conf)),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),

View File

@@ -22,11 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -60,7 +60,7 @@ public class HiveSyncContext {
public static HiveSyncContext create(Configuration conf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
String path = conf.getString(FlinkOptions.PATH);
FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf();

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -54,7 +55,7 @@ public class FileIndex {
private FileIndex(Path path, Configuration conf) {
this.path = path;
this.metadataConfig = metadataConfig(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
}
public static FileIndex instance(Path path, Configuration conf) {

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.source;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -157,7 +158,7 @@ public class StreamReadMonitoringFunction
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(parameters);
}
@Override

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
@@ -92,7 +93,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
import static org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;
/**
* Hoodie batch table source that always read the latest snapshot of the underneath table.
@@ -155,7 +156,7 @@ public class HoodieTableSource implements
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -93,7 +94,7 @@ public class HoodieCatalog extends AbstractCatalog {
public HoodieCatalog(String name, Configuration options) {
super(name, options.get(DEFAULT_DATABASE));
this.catalogPathStr = options.get(CATALOG_PATH);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(options);
this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
}

View File

@@ -30,7 +30,6 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -49,7 +48,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
/**
@@ -253,14 +251,4 @@ public class FormatUtils {
private static Boolean string2Boolean(String s) {
return "true".equals(s.toLowerCase(Locale.ROOT));
}
public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
final String prefix = "parquet.";
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix);
parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
return copy;
}
}

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
@@ -36,7 +37,6 @@ import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitRea
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
@@ -167,7 +167,7 @@ public class MergeOnReadInputFormat
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.closed = false;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
@@ -306,7 +306,7 @@ public class MergeOnReadInputFormat
return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
true,
FormatUtils.getParquetConf(this.conf, hadoopConf),
HadoopConfigurations.getParquetConf(this.conf, hadoopConf),
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new DataType[0]),
partObjects,

View File

@@ -27,7 +27,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
import static org.apache.hudi.configuration.HadoopConfigurations.getHadoopConf;
import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;
/**
@@ -44,7 +44,7 @@ public class FlinkTables {
*/
public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new SerializableConfiguration(getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
return HoodieFlinkTable.create(writeConfig, context);

View File

@@ -43,6 +43,7 @@ import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -101,7 +102,7 @@ public class StreamerUtil {
return new TypedProperties();
}
return readConfig(
getHadoopConf(),
HadoopConfigurations.getHadoopConf(cfg),
new Path(cfg.propsFilePath), cfg.configs).getProps();
}
@@ -140,11 +141,6 @@ public class StreamerUtil {
return conf;
}
// Keep the redundant to avoid too many modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
return FlinkClientUtil.getHadoopConf();
}
/**
* Mainly used for tests.
*/
@@ -215,7 +211,7 @@ public class StreamerUtil {
HoodieWriteConfig writeConfig = builder.build();
if (loadFsViewStorageConfig) {
// do not use the builder to give a change for recovering the original fs view storage config
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), conf);
writeConfig.setViewStorageConfig(viewStorageConfig);
}
return writeConfig;
@@ -255,7 +251,7 @@ public class StreamerUtil {
*/
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
@@ -348,18 +344,11 @@ public class StreamerUtil {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return createMetaClient(basePath, FlinkClientUtil.getHadoopConf());
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return createMetaClient(conf.getString(FlinkOptions.PATH));
return createMetaClient(conf.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(conf));
}
/**
@@ -382,7 +371,7 @@ public class StreamerUtil {
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
@@ -410,7 +399,7 @@ public class StreamerUtil {
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
.build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
return writeClient;
}

View File

@@ -18,8 +18,10 @@
package org.apache.hudi.util;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -48,9 +50,10 @@ public class ViewStorageProperties {
*/
public static void createProperties(
String basePath,
FileSystemViewStorageConfig config) throws IOException {
FileSystemViewStorageConfig config,
Configuration flinkConf) throws IOException {
Path propertyPath = getPropertiesFilePath(basePath);
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(flinkConf));
fs.delete(propertyPath, false);
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
config.getProps().store(outputStream,
@@ -61,10 +64,10 @@ public class ViewStorageProperties {
/**
* Read the {@link FileSystemViewStorageConfig} with given table base path.
*/
public static FileSystemViewStorageConfig loadFromProperties(String basePath) {
public static FileSystemViewStorageConfig loadFromProperties(String basePath, Configuration conf) {
Path propertyPath = getPropertiesFilePath(basePath);
LOG.info("Loading filesystem view storage properties from " + propertyPath);
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(conf));
Properties props = new Properties();
try {
try (FSDataInputStream inputStream = fs.open(propertyPath)) {