1
0

[HUDI-2403] Add metadata table listing for flink query source (#3618)

This commit is contained in:
Danny Chan
2021-09-08 14:52:39 +08:00
committed by GitHub
parent 81acb4cafe
commit db2ab9a150
6 changed files with 234 additions and 14 deletions

View File

@@ -72,7 +72,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
.key("partition.default_name")
.stringType()
.defaultValue("__DEFAULT_PARTITION__")
.defaultValue("default") // keep sync with hoodie style
.withDescription("The default partition name in case the dynamic partition"
+ " column value is null/empty string");

View File

@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* A file index which supports listing files efficiently through metadata table.
*
* <p>It caches the partition paths to avoid redundant look up.
*/
public class FileIndex {
private final Path path;
private final HoodieMetadataConfig metadataConfig;
private List<String> partitionPaths; // cache of partition paths
private FileIndex(Path path, Configuration conf) {
this.path = path;
this.metadataConfig = metadataConfig(conf);
}
public static FileIndex instance(Path path, Configuration conf) {
return new FileIndex(path, conf);
}
/**
* Returns the partition path key and values as a list of map, each map item in the list
* is a mapping of the partition key name to its actual partition value. For example, say
* there is a file path with partition keys [key1, key2, key3]:
*
* <p><pre>
* -- file:/// ... key1=val1/key2=val2/key3=val3
* -- file:/// ... key1=val4/key2=val5/key3=val6
* </pre>
*
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
* @param hivePartition Whether the partition path is in Hive style
*/
public List<Map<String, String>> getPartitions(
List<String> partitionKeys,
String defaultParName,
boolean hivePartition) {
if (partitionKeys.size() == 0) {
// non partitioned table
return Collections.emptyList();
}
List<String> partitionPaths = getOrBuildPartitionPaths();
if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) {
return Collections.emptyList();
}
List<Map<String, String>> partitions = new ArrayList<>();
for (String partitionPath : partitionPaths) {
String[] paths = partitionPath.split(File.separator);
Map<String, String> partitionMapping = new LinkedHashMap<>();
if (hivePartition) {
Arrays.stream(paths).forEach(p -> {
String[] kv = p.split("=");
if (kv.length == 2) {
partitionMapping.put(kv[0], defaultParName.equals(kv[1]) ? null : kv[1]);
}
});
} else {
for (int i = 0; i < partitionKeys.size(); i++) {
partitionMapping.put(partitionKeys.get(i), defaultParName.equals(paths[i]) ? null : paths[i]);
}
}
partitions.add(partitionMapping);
}
return partitions;
}
/**
* Returns all the file statuses under the table base path.
*/
public FileStatus[] getFilesInPartitions() {
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new Path(path, p).toString()).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
}
/**
* Reset the state of the file index.
*/
@VisibleForTesting
public void reset() {
this.partitionPaths = null;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private List<String> getOrBuildPartitionPaths() {
if (this.partitionPaths != null) {
return this.partitionPaths;
}
this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
metadataConfig, path.toString());
return this.partitionPaths;
}
private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuration.Configuration conf) {
Properties properties = new Properties();
// set up metadata.enabled=true in table DDL to enable metadata listing
properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED));
properties.put(HoodieMetadataConfig.SYNC_ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED));
properties.put(HoodieMetadataConfig.VALIDATE_ENABLE, false);
return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
}
}

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
@@ -116,6 +117,7 @@ public class HoodieTableSource implements
private final List<String> partitionKeys;
private final String defaultPartName;
private final Configuration conf;
private final FileIndex fileIndex;
private int[] requiredPos;
private long limit;
@@ -147,6 +149,7 @@ public class HoodieTableSource implements
this.partitionKeys = partitionKeys;
this.defaultPartName = defaultPartName;
this.conf = conf;
this.fileIndex = FileIndex.instance(this.path, this.conf);
this.requiredPartitions = requiredPartitions;
this.requiredPos = requiredPos == null
? IntStream.range(0, schema.getColumnCount()).toArray()
@@ -222,8 +225,8 @@ public class HoodieTableSource implements
@Override
public Optional<List<Map<String, String>>> listPartitions() {
List<Map<String, String>> partitions = FilePathUtils.getPartitions(path, hadoopConf,
partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
List<Map<String, String>> partitions = this.fileIndex.getPartitions(
this.partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
return Optional.of(partitions);
}
@@ -277,10 +280,7 @@ public class HoodieTableSource implements
if (paths.length == 0) {
return Collections.emptyList();
}
FileStatus[] fileStatuses = Arrays.stream(paths)
.flatMap(path ->
Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf)))
.toArray(FileStatus[]::new);
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
if (fileStatuses.length == 0) {
throw new HoodieException("No files found for reading in user provided path.");
}
@@ -492,6 +492,7 @@ public class HoodieTableSource implements
public void reset() {
this.metaClient.reloadActiveTimeline();
this.requiredPartitions = null;
this.fileIndex.reset();
}
/**

View File

@@ -142,6 +142,9 @@ public class StreamWriteFunctionWrapper<I> {
public void openFunction() throws Exception {
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext));
}
toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);

View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.source;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link FileIndex}.
*/
public class TestFileIndex {
@TempDir
File tempFile;
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf);
List<String> partitionKeys = Collections.singletonList("partition");
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning);
assertTrue(partitions.stream().allMatch(m -> m.size() == 1));
String partitionPaths = partitions.stream()
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
assertThat("should have 4 partitions", partitionPaths, is("par1,par2,par3,par4"));
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
assertThat(fileStatuses.length, is(4));
assertTrue(Arrays.stream(fileStatuses)
.allMatch(fileStatus -> fileStatus.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())));
}
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -37,14 +36,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -64,12 +61,10 @@ public class TestHoodieTableSource {
@TempDir
File tempFile;
void beforeEach() throws IOException {
void beforeEach() throws Exception {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
StreamerUtil.initTableIfNotExists(conf);
IntStream.range(1, 5)
.forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
TestData.writeData(TestData.DATA_SET_INSERT, conf);
}
@Test