[HUDI-1611] Added a configuration to allow specific directories to be filtered out during Metadata Table bootstrap. (#2565)
This commit is contained in:
@@ -318,6 +318,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
|
Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
|
||||||
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
|
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
|
||||||
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
|
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
|
||||||
|
final String dirFilterRegex = datasetWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
|
||||||
|
|
||||||
while (!pathsToList.isEmpty()) {
|
while (!pathsToList.isEmpty()) {
|
||||||
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
|
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
|
||||||
@@ -331,6 +332,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
|
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
|
||||||
// the results.
|
// the results.
|
||||||
dirToFileListing.forEach(p -> {
|
dirToFileListing.forEach(p -> {
|
||||||
|
if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) {
|
||||||
|
LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
|
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
|
||||||
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
|
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|||||||
@@ -148,14 +148,22 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
|
final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
|
||||||
Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
|
Files.createDirectories(Paths.get(basePath, nonPartitionDirectory));
|
||||||
|
|
||||||
|
// Three directories which are partitions but will be ignored due to filter
|
||||||
|
final String filterDirRegex = ".*-filterDir\\d|\\..*";
|
||||||
|
final String filteredDirectoryOne = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir1";
|
||||||
|
final String filteredDirectoryTwo = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir2";
|
||||||
|
final String filteredDirectoryThree = ".backups";
|
||||||
|
|
||||||
// Create some commits
|
// Create some commits
|
||||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||||
testTable.withPartitionMetaFiles("p1", "p2")
|
testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree)
|
||||||
.addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
|
.addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10)
|
||||||
.addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10)
|
.addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10)
|
||||||
.addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10);
|
.addInflightCommit("003").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10);
|
||||||
|
|
||||||
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
|
final HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
|
||||||
|
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
|
||||||
client.startCommitWithTime("005");
|
client.startCommitWithTime("005");
|
||||||
|
|
||||||
List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
|
List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
|
||||||
@@ -164,6 +172,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
|||||||
assertTrue(partitions.contains("p1"), "Must contain partition p1");
|
assertTrue(partitions.contains("p1"), "Must contain partition p1");
|
||||||
assertTrue(partitions.contains("p2"), "Must contain partition p2");
|
assertTrue(partitions.contains("p2"), "Must contain partition p2");
|
||||||
|
|
||||||
|
assertFalse(partitions.contains(filteredDirectoryOne),
|
||||||
|
"Must not contain the filtered directory " + filteredDirectoryOne);
|
||||||
|
assertFalse(partitions.contains(filteredDirectoryTwo),
|
||||||
|
"Must not contain the filtered directory " + filteredDirectoryTwo);
|
||||||
|
assertFalse(partitions.contains(filteredDirectoryThree),
|
||||||
|
"Must not contain the filtered directory " + filteredDirectoryThree);
|
||||||
|
|
||||||
FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
|
FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1"));
|
||||||
assertTrue(statuses.length == 2);
|
assertTrue(statuses.length == 2);
|
||||||
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
|
statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2"));
|
||||||
|
|||||||
@@ -75,6 +75,10 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
|
|||||||
public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable";
|
public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable";
|
||||||
public static final String DEFAULT_ENABLE_FALLBACK = "true";
|
public static final String DEFAULT_ENABLE_FALLBACK = "true";
|
||||||
|
|
||||||
|
// Regex to filter out matching directories during bootstrap
|
||||||
|
public static final String DIRECTORY_FILTER_REGEX = METADATA_PREFIX + ".dir.filter.regex";
|
||||||
|
public static final String DEFAULT_DIRECTORY_FILTER_REGEX = "";
|
||||||
|
|
||||||
public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
|
public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
|
||||||
public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
|
public static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
|
||||||
|
|
||||||
@@ -117,6 +121,10 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(METADATA_METRICS_ENABLE_PROP));
|
return Boolean.parseBoolean(props.getProperty(METADATA_METRICS_ENABLE_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getDirectoryFilterRegex() {
|
||||||
|
return props.getProperty(DIRECTORY_FILTER_REGEX);
|
||||||
|
}
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
||||||
private final Properties props = new Properties();
|
private final Properties props = new Properties();
|
||||||
@@ -194,6 +202,11 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withDirectoryFilterRegex(String regex) {
|
||||||
|
props.setProperty(DIRECTORY_FILTER_REGEX, regex);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieMetadataConfig build() {
|
public HoodieMetadataConfig build() {
|
||||||
HoodieMetadataConfig config = new HoodieMetadataConfig(props);
|
HoodieMetadataConfig config = new HoodieMetadataConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
|
setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP,
|
||||||
@@ -222,6 +235,8 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
|
|||||||
DEFAULT_ENABLE_FALLBACK);
|
DEFAULT_ENABLE_FALLBACK);
|
||||||
setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP,
|
setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP,
|
||||||
DEFAULT_ENABLE_REUSE);
|
DEFAULT_ENABLE_REUSE);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(DIRECTORY_FILTER_REGEX), DIRECTORY_FILTER_REGEX,
|
||||||
|
DEFAULT_DIRECTORY_FILTER_REGEX);
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user