[HUDI-2951] Disable remote view storage config for flink (#4237)
This commit is contained in:
@@ -62,7 +62,7 @@ public class CleanFunction<T> extends AbstractRichFunction
|
||||
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||
// do not use the remote filesystem view because the async cleaning service
|
||||
// local timeline is very probably to fall behind with the remote one.
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false);
|
||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||
this.executor = NonThrownExecutor.builder(LOG).build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
|
||||
}
|
||||
|
||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
||||
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
||||
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
||||
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
|
||||
|
||||
|
||||
@@ -113,7 +113,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false);
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
|
||||
@@ -46,7 +46,7 @@ public class FlinkTables {
|
||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(runtimeContext));
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
||||
return HoodieFlinkTable.create(writeConfig, context);
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ public class FlinkTables {
|
||||
* <p>This expects to be used by driver.
|
||||
*/
|
||||
public static HoodieFlinkTable<?> createTable(Configuration conf) {
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf);
|
||||
return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
@@ -145,21 +144,7 @@ public class StreamerUtil {
|
||||
return FlinkClientUtil.getHadoopConf();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mainly used for tests.
|
||||
*/
|
||||
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
|
||||
return getHoodieClientConfig(conf, false, false);
|
||||
}
|
||||
|
||||
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) {
|
||||
return getHoodieClientConfig(conf, false, loadFsViewStorageConfig);
|
||||
}
|
||||
|
||||
public static HoodieWriteConfig getHoodieClientConfig(
|
||||
Configuration conf,
|
||||
boolean enableEmbeddedTimelineService,
|
||||
boolean loadFsViewStorageConfig) {
|
||||
HoodieWriteConfig.Builder builder =
|
||||
HoodieWriteConfig.newBuilder()
|
||||
.withEngineType(EngineType.FLINK)
|
||||
@@ -204,20 +189,13 @@ public class StreamerUtil {
|
||||
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
|
||||
.build())
|
||||
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
|
||||
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
||||
.withAutoCommit(false)
|
||||
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
||||
.withProps(flinkConf2TypedProperties(conf))
|
||||
.withSchema(getSourceSchema(conf).toString());
|
||||
|
||||
HoodieWriteConfig writeConfig = builder.build();
|
||||
if (loadFsViewStorageConfig) {
|
||||
// do not use the builder to give a change for recovering the original fs view storage config
|
||||
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
|
||||
writeConfig.setViewStorageConfig(viewStorageConfig);
|
||||
}
|
||||
return writeConfig;
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -363,28 +341,15 @@ public class StreamerUtil {
|
||||
|
||||
/**
|
||||
* Creates the Flink write client.
|
||||
*
|
||||
* <p>This expects to be used by client, the driver should start an embedded timeline server.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
|
||||
return createWriteClient(conf, runtimeContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the Flink write client.
|
||||
*
|
||||
* <p>This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use
|
||||
* remote filesystem view storage config, or an in-memory filesystem view storage is used.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(runtimeContext));
|
||||
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
||||
return new HoodieFlinkWriteClient<>(context, writeConfig);
|
||||
}
|
||||
|
||||
@@ -397,18 +362,9 @@ public class StreamerUtil {
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
|
||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
|
||||
// build the write client to start the embedded timeline server
|
||||
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
||||
// create the filesystem view storage properties for client
|
||||
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
|
||||
// rebuild the view storage config with simplified options.
|
||||
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
|
||||
.withStorageType(viewStorageConfig.getStorageType())
|
||||
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
|
||||
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
|
||||
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
|
||||
return writeClient;
|
||||
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
|
||||
|
||||
/**
|
||||
* Helper class to read/write {@link FileSystemViewStorageConfig}.
|
||||
*/
|
||||
public class ViewStorageProperties {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class);
|
||||
|
||||
private static final String FILE_NAME = "view_storage_conf.properties";
|
||||
|
||||
/**
|
||||
* Initialize the {@link #FILE_NAME} meta file.
|
||||
*/
|
||||
public static void createProperties(
|
||||
String basePath,
|
||||
FileSystemViewStorageConfig config) throws IOException {
|
||||
Path propertyPath = getPropertiesFilePath(basePath);
|
||||
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
|
||||
fs.delete(propertyPath, false);
|
||||
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
|
||||
config.getProps().store(outputStream,
|
||||
"Filesystem view storage properties saved on " + new Date(System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the {@link FileSystemViewStorageConfig} with given table base path.
|
||||
*/
|
||||
public static FileSystemViewStorageConfig loadFromProperties(String basePath) {
|
||||
Path propertyPath = getPropertiesFilePath(basePath);
|
||||
LOG.info("Loading filesystem view storage properties from " + propertyPath);
|
||||
FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
|
||||
Properties props = new Properties();
|
||||
try {
|
||||
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
|
||||
props.load(inputStream);
|
||||
}
|
||||
return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Path getPropertiesFilePath(String basePath) {
|
||||
String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
|
||||
return new Path(auxPath, FILE_NAME);
|
||||
}
|
||||
}
|
||||
@@ -19,12 +19,9 @@
|
||||
package org.apache.hudi.utils;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.util.ViewStorageProperties;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -101,13 +98,5 @@ public class TestStreamerUtil {
|
||||
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
|
||||
assertThat(diff, is(75L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDumpRemoteViewStorageConfig() throws IOException {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
StreamerUtil.createWriteClient(conf);
|
||||
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
|
||||
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utils;
|
||||
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.util.ViewStorageProperties;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
/**
|
||||
* Test cases for {@link ViewStorageProperties}.
|
||||
*/
|
||||
public class TestViewStorageProperties {
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@Test
|
||||
void testReadWriteProperties() throws IOException {
|
||||
String basePath = tempFile.getAbsolutePath();
|
||||
FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder()
|
||||
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
|
||||
.withRemoteServerHost("host1")
|
||||
.withRemoteServerPort(1234).build();
|
||||
ViewStorageProperties.createProperties(basePath, config);
|
||||
ViewStorageProperties.createProperties(basePath, config);
|
||||
ViewStorageProperties.createProperties(basePath, config);
|
||||
|
||||
FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath);
|
||||
assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK));
|
||||
assertThat(readConfig.getRemoteViewServerHost(), is("host1"));
|
||||
assertThat(readConfig.getRemoteViewServerPort(), is(1234));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user