diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 13154b217..77d663004 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -62,7 +62,7 @@ public class CleanFunction extends AbstractRichFunction if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { // do not use the remote filesystem view because the async cleaning service // local timeline is very probably to fall behind with the remote one. - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.executor = NonThrownExecutor.builder(LOG).build(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index f6055ba11..513179bc5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -120,7 +120,7 @@ public class BootstrapOperator> } this.hadoopConf = StreamerUtil.getHadoopConf(); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 14cad16df..67040c187 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -113,7 +113,7 @@ public class BucketAssignFunction> @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java index 6918a06b1..9be47752b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java @@ -46,7 +46,7 @@ public class FlinkTables { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); return HoodieFlinkTable.create(writeConfig, context); } @@ -71,7 +71,7 @@ public class FlinkTables { *

This expects to be used by driver. */ public static HoodieFlinkTable createTable(Configuration conf) { - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf); return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 98df0bbcf..516c75b87 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,7 +33,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -145,21 +144,7 @@ public class StreamerUtil { return FlinkClientUtil.getHadoopConf(); } - /** - * Mainly used for tests. - */ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { - return getHoodieClientConfig(conf, false, false); - } - - public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) { - return getHoodieClientConfig(conf, false, loadFsViewStorageConfig); - } - - public static HoodieWriteConfig getHoodieClientConfig( - Configuration conf, - boolean enableEmbeddedTimelineService, - boolean loadFsViewStorageConfig) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) @@ -204,20 +189,13 @@ public class StreamerUtil { .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .build()) - .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); - HoodieWriteConfig writeConfig = builder.build(); - if (loadFsViewStorageConfig) { - // do not use the builder to give a change for recovering the original fs view storage config - FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); - writeConfig.setViewStorageConfig(viewStorageConfig); - } - return writeConfig; + return builder.build(); } /** @@ -363,28 +341,15 @@ public class StreamerUtil { /** * Creates the Flink write client. - * - *

This expects to be used by client, the driver should start an embedded timeline server. */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { - return createWriteClient(conf, runtimeContext, true); - } - - /** - * Creates the Flink write client. - * - *

This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use - * remote filesystem view storage config, or an in-memory filesystem view storage is used. - */ - @SuppressWarnings("rawtypes") - public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); return new HoodieFlinkWriteClient<>(context, writeConfig); } @@ -397,18 +362,9 @@ public class StreamerUtil { */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); // build the write client to start the embedded timeline server - final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); - // create the filesystem view storage properties for client - final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); - // rebuild the view storage config with simplified options. - FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() - .withStorageType(viewStorageConfig.getStorageType()) - .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) - .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); - ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); - return writeClient; + return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java deleted file mode 100644 index da55e27f0..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Date; -import java.util.Properties; - -import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; - -/** - * Helper class to read/write {@link FileSystemViewStorageConfig}. - */ -public class ViewStorageProperties { - private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class); - - private static final String FILE_NAME = "view_storage_conf.properties"; - - /** - * Initialize the {@link #FILE_NAME} meta file. - */ - public static void createProperties( - String basePath, - FileSystemViewStorageConfig config) throws IOException { - Path propertyPath = getPropertiesFilePath(basePath); - FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); - fs.delete(propertyPath, false); - try (FSDataOutputStream outputStream = fs.create(propertyPath)) { - config.getProps().store(outputStream, - "Filesystem view storage properties saved on " + new Date(System.currentTimeMillis())); - } - } - - /** - * Read the {@link FileSystemViewStorageConfig} with given table base path. - */ - public static FileSystemViewStorageConfig loadFromProperties(String basePath) { - Path propertyPath = getPropertiesFilePath(basePath); - LOG.info("Loading filesystem view storage properties from " + propertyPath); - FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); - Properties props = new Properties(); - try { - try (FSDataInputStream inputStream = fs.open(propertyPath)) { - props.load(inputStream); - } - return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build(); - } catch (IOException e) { - throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e); - } - } - - private static Path getPropertiesFilePath(String basePath) { - String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME; - return new Path(auxPath, FILE_NAME); - } -} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 57297c50e..e00fbfac5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -19,12 +19,9 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; -import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -101,13 +98,5 @@ public class TestStreamerUtil { long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); assertThat(diff, is(75L)); } - - @Test - void testDumpRemoteViewStorageConfig() throws IOException { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - StreamerUtil.createWriteClient(conf); - FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); - assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java deleted file mode 100644 index f80760bf1..000000000 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utils; - -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.util.ViewStorageProperties; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.io.IOException; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Test cases for {@link ViewStorageProperties}. - */ -public class TestViewStorageProperties { - @TempDir - File tempFile; - - @Test - void testReadWriteProperties() throws IOException { - String basePath = tempFile.getAbsolutePath(); - FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder() - .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) - .withRemoteServerHost("host1") - .withRemoteServerPort(1234).build(); - ViewStorageProperties.createProperties(basePath, config); - ViewStorageProperties.createProperties(basePath, config); - ViewStorageProperties.createProperties(basePath, config); - - FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath); - assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK)); - assertThat(readConfig.getRemoteViewServerHost(), is("host1")); - assertThat(readConfig.getRemoteViewServerPort(), is(1234)); - } -}