From db2ab9a15092c1423ee308b1bba7dd7e2f5027a8 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 8 Sep 2021 14:52:39 +0800 Subject: [PATCH] [HUDI-2403] Add metadata table listing for flink query source (#3618) --- .../hudi/configuration/FlinkOptions.java | 2 +- .../org/apache/hudi/source/FileIndex.java | 149 ++++++++++++++++++ .../apache/hudi/table/HoodieTableSource.java | 13 +- .../utils/StreamWriteFunctionWrapper.java | 3 + .../org/apache/hudi/source/TestFileIndex.java | 72 +++++++++ .../hudi/table/TestHoodieTableSource.java | 9 +- 6 files changed, 234 insertions(+), 14 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8504f6aa7..6e0ff5207 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -72,7 +72,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption PARTITION_DEFAULT_NAME = ConfigOptions .key("partition.default_name") .stringType() - .defaultValue("__DEFAULT_PARTITION__") + .defaultValue("default") // keep sync with hoodie style .withDescription("The default partition name in case the dynamic partition" + " column value is null/empty string"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java new file mode 100644 index 000000000..bf37f1204 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A file index which supports listing files efficiently through metadata table. + * + *

It caches the partition paths to avoid redundant look up. + */ +public class FileIndex { + private final Path path; + private final HoodieMetadataConfig metadataConfig; + private List partitionPaths; // cache of partition paths + + private FileIndex(Path path, Configuration conf) { + this.path = path; + this.metadataConfig = metadataConfig(conf); + } + + public static FileIndex instance(Path path, Configuration conf) { + return new FileIndex(path, conf); + } + + /** + * Returns the partition path key and values as a list of map, each map item in the list + * is a mapping of the partition key name to its actual partition value. For example, say + * there is a file path with partition keys [key1, key2, key3]: + * + *

+   *   -- file:/// ... key1=val1/key2=val2/key3=val3
+   *   -- file:/// ... key1=val4/key2=val5/key3=val6
+   * 
+ * + *

The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}]. + * + * @param partitionKeys The partition key list + * @param defaultParName The default partition name for nulls + * @param hivePartition Whether the partition path is in Hive style + */ + public List> getPartitions( + List partitionKeys, + String defaultParName, + boolean hivePartition) { + if (partitionKeys.size() == 0) { + // non partitioned table + return Collections.emptyList(); + } + List partitionPaths = getOrBuildPartitionPaths(); + if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) { + return Collections.emptyList(); + } + List> partitions = new ArrayList<>(); + for (String partitionPath : partitionPaths) { + String[] paths = partitionPath.split(File.separator); + Map partitionMapping = new LinkedHashMap<>(); + if (hivePartition) { + Arrays.stream(paths).forEach(p -> { + String[] kv = p.split("="); + if (kv.length == 2) { + partitionMapping.put(kv[0], defaultParName.equals(kv[1]) ? null : kv[1]); + } + }); + } else { + for (int i = 0; i < partitionKeys.size(); i++) { + partitionMapping.put(partitionKeys.get(i), defaultParName.equals(paths[i]) ? null : paths[i]); + } + } + partitions.add(partitionMapping); + } + return partitions; + } + + /** + * Returns all the file statuses under the table base path. + */ + public FileStatus[] getFilesInPartitions() { + String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new Path(path, p).toString()).toArray(String[]::new); + return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), + partitions, "/tmp/") + .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); + } + + /** + * Reset the state of the file index. + */ + @VisibleForTesting + public void reset() { + this.partitionPaths = null; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private List getOrBuildPartitionPaths() { + if (this.partitionPaths != null) { + return this.partitionPaths; + } + this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, + metadataConfig, path.toString()); + return this.partitionPaths; + } + + private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuration.Configuration conf) { + Properties properties = new Properties(); + + // set up metadata.enabled=true in table DDL to enable metadata listing + properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED)); + properties.put(HoodieMetadataConfig.SYNC_ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED)); + properties.put(HoodieMetadataConfig.VALIDATE_ENABLE, false); + + return HoodieMetadataConfig.newBuilder().fromProperties(properties).build(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 78d1db6c6..43743fc64 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; +import org.apache.hudi.source.FileIndex; import org.apache.hudi.source.StreamReadMonitoringFunction; import org.apache.hudi.source.StreamReadOperator; import org.apache.hudi.table.format.FilePathUtils; @@ -116,6 +117,7 @@ public class HoodieTableSource implements private final List partitionKeys; private final String defaultPartName; private final Configuration conf; + private final FileIndex fileIndex; private int[] requiredPos; private long limit; @@ -147,6 +149,7 @@ public class HoodieTableSource implements this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; this.conf = conf; + this.fileIndex = FileIndex.instance(this.path, this.conf); this.requiredPartitions = requiredPartitions; this.requiredPos = requiredPos == null ? IntStream.range(0, schema.getColumnCount()).toArray() @@ -222,8 +225,8 @@ public class HoodieTableSource implements @Override public Optional>> listPartitions() { - List> partitions = FilePathUtils.getPartitions(path, hadoopConf, - partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); + List> partitions = this.fileIndex.getPartitions( + this.partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); return Optional.of(partitions); } @@ -277,10 +280,7 @@ public class HoodieTableSource implements if (paths.length == 0) { return Collections.emptyList(); } - FileStatus[] fileStatuses = Arrays.stream(paths) - .flatMap(path -> - Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf))) - .toArray(FileStatus[]::new); + FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); if (fileStatuses.length == 0) { throw new HoodieException("No files found for reading in user provided path."); } @@ -492,6 +492,7 @@ public class HoodieTableSource implements public void reset() { this.metaClient.reloadActiveTimeline(); this.requiredPartitions = null; + this.fileIndex.reset(); } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 6b6bedea5..c5d3ec5a2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -142,6 +142,9 @@ public class StreamWriteFunctionWrapper { public void openFunction() throws Exception { this.coordinator.start(); this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); + if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) { + this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext)); + } toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java new file mode 100644 index 000000000..b1f442fa6 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link FileIndex}. + */ +public class TestFileIndex { + @TempDir + File tempFile; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + List partitionKeys = Collections.singletonList("partition"); + List> partitions = fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning); + assertTrue(partitions.stream().allMatch(m -> m.size() == 1)); + String partitionPaths = partitions.stream() + .map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(",")); + assertThat("should have 4 partitions", partitionPaths, is("par1,par2,par3,par4")); + + FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); + assertThat(fileStatuses.length, is(4)); + assertTrue(Arrays.stream(fileStatuses) + .allMatch(fileStatus -> fileStatus.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 25742a7fa..d50a716cf 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -21,7 +21,6 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; -import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -37,14 +36,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -64,12 +61,10 @@ public class TestHoodieTableSource { @TempDir File tempFile; - void beforeEach() throws IOException { + void beforeEach() throws Exception { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); - StreamerUtil.initTableIfNotExists(conf); - IntStream.range(1, 5) - .forEach(i -> new File(path + File.separator + "par" + i).mkdirs()); + TestData.writeData(TestData.DATA_SET_INSERT, conf); } @Test