1
0

[HUDI-3465] Add validation of column stats and bloom filters in HoodieMetadataTableValidator (#4878)

This commit is contained in:
Y Ethan Guo
2022-02-28 18:49:30 -08:00
committed by GitHub
parent 44b8ab6048
commit 257052a94d
4 changed files with 424 additions and 79 deletions

View File

@@ -20,29 +20,44 @@ package org.apache.hudi.utilities;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.utilities.util.BloomFilterData;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import jline.internal.Log;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -54,10 +69,17 @@ import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* A validator with spark-submit to compare list partitions and list files between metadata table and filesystem.
*
* A validator with spark-submit to compare information, such as partitions, file listing, index, etc.,
* between metadata table and filesystem.
* <p>
* - Default : This validator will compare the result of listing partitions/listing files between metadata table and filesystem only once.
* There are five validation tasks, that can be enabled independently through the following CLI options:
* - `--validate-latest-file-slices`: validate latest file slices for all partitions.
* - `--validate-latest-base-files`: validate latest base files for all partitions.
* - `--validate-all-file-groups`: validate all file groups, and all file slices within file groups.
* - `--validate-all-column-stats`: validate column stats for all columns in the schema
* - `--validate-bloom-filters`: validate bloom filters of base files
* <p>
* - Default : This validator will compare the results between metadata table and filesystem only once.
* <p>
* Example command:
* ```
@@ -160,6 +182,12 @@ public class HoodieMetadataTableValidator implements Serializable {
@Parameter(names = {"--validate-all-file-groups"}, description = "Validate all file groups, and all file slices within file groups.", required = false)
public boolean validateAllFileGroups = false;
@Parameter(names = {"--validate-all-column-stats"}, description = "Validate column stats for all columns in the schema", required = false)
public boolean validateAllColumnStats = false;
@Parameter(names = {"--validate-bloom-filters"}, description = "Validate bloom filters of base files", required = false)
public boolean validateBloomFilters = false;
@Parameter(names = {"--min-validate-interval-seconds"},
description = "the min validate interval of each validate when set --continuous, default is 10 minutes.")
public Integer minValidateIntervalSeconds = 10 * 60;
@@ -199,6 +227,8 @@ public class HoodieMetadataTableValidator implements Serializable {
+ " --validate-latest-file-slices " + validateLatestFileSlices + ", \n"
+ " --validate-latest-base-files " + validateLatestBaseFiles + ", \n"
+ " --validate-all-file-groups " + validateAllFileGroups + ", \n"
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
+ " --continuous " + continuous + ", \n"
+ " --ignore-failed " + ignoreFailed + ", \n"
+ " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n"
@@ -225,6 +255,8 @@ public class HoodieMetadataTableValidator implements Serializable {
&& Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices)
&& Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles)
&& Objects.equals(validateAllFileGroups, config.validateAllFileGroups)
&& Objects.equals(validateAllColumnStats, config.validateAllColumnStats)
&& Objects.equals(validateBloomFilters, config.validateBloomFilters)
&& Objects.equals(minValidateIntervalSeconds, config.minValidateIntervalSeconds)
&& Objects.equals(parallelism, config.parallelism)
&& Objects.equals(ignoreFailed, config.ignoreFailed)
@@ -237,8 +269,9 @@ public class HoodieMetadataTableValidator implements Serializable {
@Override
public int hashCode() {
return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles, validateAllFileGroups,
minValidateIntervalSeconds, parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles,
validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds,
parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
}
}
@@ -314,12 +347,14 @@ public class HoodieMetadataTableValidator implements Serializable {
String basePath = metaClient.getBasePath();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
List<String> allPartitions = validatePartitions(engineContext, basePath);
HoodieTableFileSystemView metaFsView = createHoodieTableFileSystemView(engineContext, true);
HoodieTableFileSystemView fsView = createHoodieTableFileSystemView(engineContext, false);
HoodieMetadataValidationContext metadataTableBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true);
HoodieMetadataValidationContext fsBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false);
List<Boolean> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metaFsView, fsView, partitionPath);
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath);
LOG.info("Metadata table validation succeeded for " + partitionPath);
return true;
} catch (HoodieValidationException e) {
@@ -364,65 +399,73 @@ public class HoodieMetadataTableValidator implements Serializable {
}
/**
* Compare the listing files result between metadata table and fileSystem.
* For now, validate two kinds of apis:
* 1. getLatestFileSlices
* 2. getLatestBaseFiles
* 3. getAllFileGroups and getAllFileSlices
* @param metaFsView
* @param fsView
* @param partitionPath
* Compare the file listing and index data between metadata table and fileSystem.
* For now, validate five kinds of apis:
* 1. HoodieMetadataFileSystemView::getLatestFileSlices
* 2. HoodieMetadataFileSystemView::getLatestBaseFiles
* 3. HoodieMetadataFileSystemView::getAllFileGroups and HoodieMetadataFileSystemView::getAllFileSlices
* 4. HoodieBackedTableMetadata::getColumnStats
* 5. HoodieBackedTableMetadata::getBloomFilters
*
* @param metadataTableBasedContext Validation context containing information based on metadata table
* @param fsBasedContext Validation context containing information based on the file system
* @param partitionPath Partition path String
*/
private void validateFilesInPartition(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
private void validateFilesInPartition(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
if (cfg.validateLatestFileSlices) {
validateLatestFileSlices(metaFsView, fsView, partitionPath);
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath);
}
if (cfg.validateLatestBaseFiles) {
validateLatestBaseFiles(metaFsView, fsView, partitionPath);
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath);
}
if (cfg.validateAllFileGroups) {
validateAllFileGroups(metaFsView, fsView, partitionPath);
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath);
}
if (cfg.validateAllColumnStats) {
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath);
}
if (cfg.validateBloomFilters) {
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath);
}
}
private void validateAllFileGroups(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
List<HoodieFileGroup> fileGroupsFromMetadata = metaFsView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupCompactor()).collect(Collectors.toList());
List<HoodieFileGroup> fileGroupsFromFS = fsView.getAllFileGroups(partitionPath).sorted(new HoodieFileGroupCompactor()).collect(Collectors.toList());
private void validateAllFileGroups(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<FileSlice> allFileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> allFileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> allFileSlicesFromMeta = fileGroupsFromMetadata.stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceCompactor()).collect(Collectors.toList());
List<FileSlice> allFileSlicesFromFS = fileGroupsFromFS.stream().flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceCompactor()).collect(Collectors.toList());
LOG.info("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
LOG.info("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
validateFileSlice(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath);
LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
validate(allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, "file slices");
LOG.info("Validation of all file groups succeeded for partition " + partitionPath);
}
private void validateFileSlice(List<FileSlice> fileSlicesFromMeta, List<FileSlice> fileSlicesFromFS, String partitionPath) {
if (fileSlicesFromMeta.size() != fileSlicesFromFS.size() || !fileSlicesFromMeta.equals(fileSlicesFromFS)) {
String message = "Validation of metadata file slices for partition " + partitionPath + " failed. "
+ "File slices from metadata: " + fileSlicesFromMeta
+ "File slices from direct listing: " + fileSlicesFromFS;
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info("Validation of file slices succeeded for partition " + partitionPath);
}
}
/**
* Compare getLatestBaseFiles between metadata table and fileSystem.
*/
private void validateLatestBaseFiles(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
private void validateLatestBaseFiles(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<HoodieBaseFile> latestFilesFromMetadata = metaFsView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileCompactor()).collect(Collectors.toList());
List<HoodieBaseFile> latestFilesFromFS = fsView.getLatestBaseFiles(partitionPath).sorted(new HoodieBaseFileCompactor()).collect(Collectors.toList());
List<HoodieBaseFile> latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
List<HoodieBaseFile> latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
LOG.info("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
LOG.info("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
if (latestFilesFromMetadata.size() != latestFilesFromFS.size()
|| !latestFilesFromMetadata.equals(latestFilesFromFS)) {
String message = "Validation of metadata get latest base file for partition " + partitionPath + " failed. "
@@ -438,27 +481,62 @@ public class HoodieMetadataTableValidator implements Serializable {
/**
* Compare getLatestFileSlices between metadata table and fileSystem.
*/
private void validateLatestFileSlices(HoodieTableFileSystemView metaFsView, HoodieTableFileSystemView fsView, String partitionPath) {
private void validateLatestFileSlices(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<FileSlice> latestFileSlicesFromMetadataTable = metaFsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
List<FileSlice> latestFileSlicesFromFS = fsView.getLatestFileSlices(partitionPath).sorted(new FileSliceCompactor()).collect(Collectors.toList());
List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
LOG.info("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
LOG.info("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
validateFileSlice(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath);
validate(latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, "file slices");
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
}
private HoodieTableFileSystemView createHoodieTableFileSystemView(HoodieSparkEngineContext engineContext, boolean enableMetadataTable) {
private void validateAllColumnStats(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(enableMetadataTable)
.withAssumeDatePartitioning(cfg.assumeDatePartitioning)
.build();
validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
return FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, metadataConfig);
LOG.info("Validation of column stats succeeded for partition " + partitionPath);
}
private void validateBloomFilters(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
List<BloomFilterData> fsBasedBloomFilters = fsBasedContext
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, "bloom filters");
LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
}
private <T> void validate(
List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
if (infoListFromMetadataTable.size() != infoListFromFS.size()
|| !infoListFromMetadataTable.equals(infoListFromFS)) {
String message = String.format("Validation of %s for partition %s failed."
+ "\n%s from metadata: %s\n%s from file system and base files: %s",
label, partitionPath, label, infoListFromMetadataTable, label, infoListFromFS);
LOG.error(message);
throw new HoodieValidationException(message);
} else {
LOG.info(String.format("Validation of %s succeeded for partition %s", label, partitionPath));
}
}
public class AsyncMetadataTableValidateService extends HoodieAsyncService {
@@ -491,7 +569,7 @@ public class HoodieMetadataTableValidator implements Serializable {
}
}
public static class FileSliceCompactor implements Comparator<FileSlice>, Serializable {
public static class FileSliceComparator implements Comparator<FileSlice>, Serializable {
@Override
public int compare(FileSlice o1, FileSlice o2) {
@@ -500,7 +578,7 @@ public class HoodieMetadataTableValidator implements Serializable {
}
}
public static class HoodieBaseFileCompactor implements Comparator<HoodieBaseFile>, Serializable {
public static class HoodieBaseFileComparator implements Comparator<HoodieBaseFile>, Serializable {
@Override
public int compare(HoodieBaseFile o1, HoodieBaseFile o2) {
@@ -508,11 +586,172 @@ public class HoodieMetadataTableValidator implements Serializable {
}
}
public static class HoodieFileGroupCompactor implements Comparator<HoodieFileGroup>, Serializable {
public static class HoodieFileGroupComparator implements Comparator<HoodieFileGroup>, Serializable {
@Override
public int compare(HoodieFileGroup o1, HoodieFileGroup o2) {
return o1.getFileGroupId().compareTo(o2.getFileGroupId());
}
}
}
public static class HoodieColumnRangeMetadataComparator
implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
@Override
public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
return o1.toString().compareTo(o2.toString());
}
}
/**
* Class for storing relevant information for metadata table validation.
* <p>
* If metadata table is disabled, the APIs provide the information, e.g., file listing,
* index, from the file system and base files. If metadata table is enabled, the APIs
* provide the information from the metadata table. The same API is expected to return
* the same information regardless of whether metadata table is enabled, which is
* verified in the {@link HoodieMetadataTableValidator}.
*/
private static class HoodieMetadataValidationContext implements Serializable {
private HoodieTableMetaClient metaClient;
private HoodieTableFileSystemView fileSystemView;
private HoodieTableMetadata tableMetadata;
private boolean enableMetadataTable;
private List<String> allColumnNameList;
public HoodieMetadataValidationContext(
HoodieEngineContext engineContext, Config cfg, HoodieTableMetaClient metaClient,
boolean enableMetadataTable) {
this.metaClient = metaClient;
this.enableMetadataTable = enableMetadataTable;
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(enableMetadataTable)
.withMetadataIndexBloomFilter(enableMetadataTable)
.withMetadataIndexColumnStats(enableMetadataTable)
.withMetadataIndexForAllColumns(enableMetadataTable)
.withAssumeDatePartitioning(cfg.assumeDatePartitioning)
.build();
this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, metadataConfig);
this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(),
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) {
this.allColumnNameList = getAllColumnNames();
}
}
public List<HoodieBaseFile> getSortedLatestBaseFileList(String partitionPath) {
return fileSystemView.getLatestBaseFiles(partitionPath)
.sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());
}
public List<FileSlice> getSortedLatestFileSliceList(String partitionPath) {
return fileSystemView.getLatestFileSlices(partitionPath)
.sorted(new FileSliceComparator()).collect(Collectors.toList());
}
public List<HoodieFileGroup> getSortedAllFileGroupList(String partitionPath) {
return fileSystemView.getAllFileGroups(partitionPath)
.sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
}
public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
String partitionPath, List<String> baseFileNameList) {
LOG.info("All column names for getting column stats: " + allColumnNameList);
if (enableMetadataTable) {
List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
.map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
return allColumnNameList.stream()
.flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
.map(stats -> new HoodieColumnRangeMetadata<>(
stats.getFileName(),
columnName,
stats.getMinValue(),
stats.getMaxValue(),
stats.getNullCount(),
stats.getValueCount(),
stats.getTotalSize(),
stats.getTotalUncompressedSize()))
.collect(Collectors.toList())
.stream())
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
} else {
return baseFileNameList.stream().flatMap(filename ->
new ParquetUtils().readRangeFromParquetMetadata(
metaClient.getHadoopConf(),
new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
allColumnNameList).stream())
.map(rangeMetadata -> new HoodieColumnRangeMetadata<String>(
rangeMetadata.getFilePath(),
rangeMetadata.getColumnName(),
// Note: here we ignore the type in the validation,
// since column stats from metadata table store the min/max values as String
rangeMetadata.getMinValue().toString(),
rangeMetadata.getMaxValue().toString(),
rangeMetadata.getNullCount(),
rangeMetadata.getValueCount(),
rangeMetadata.getTotalSize(),
rangeMetadata.getTotalUncompressedSize()
))
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
}
}
public List<BloomFilterData> getSortedBloomFilterList(
String partitionPath, List<String> baseFileNameList) {
if (enableMetadataTable) {
List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream()
.map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList());
return tableMetadata.getBloomFilters(partitionFileNameList).entrySet().stream()
.map(entry -> BloomFilterData.builder()
.setPartitionPath(entry.getKey().getKey())
.setFilename(entry.getKey().getValue())
.setBloomFilter(entry.getValue())
.build())
.sorted()
.collect(Collectors.toList());
} else {
return baseFileNameList.stream()
.map(filename -> readBloomFilterFromFile(partitionPath, filename))
.filter(Option::isPresent)
.map(Option::get)
.sorted()
.collect(Collectors.toList());
}
}
private List<String> getAllColumnNames() {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
try {
return schemaResolver.getTableAvroSchema().getFields().stream()
.map(entry -> entry.name()).collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieException("Failed to get all column names for " + metaClient.getBasePath());
}
}
private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) {
Path path = new Path(new Path(metaClient.getBasePath(), partitionPath), filename);
HoodieFileReader<IndexedRecord> fileReader;
try {
fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path);
} catch (IOException e) {
Log.error("Failed to get file reader for " + path + " " + e.getMessage());
return Option.empty();
}
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
Log.error("Failed to read bloom filter for " + path);
return Option.empty();
}
return Option.of(BloomFilterData.builder()
.setPartitionPath(partitionPath)
.setFilename(filename)
.setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()))
.build());
}
}
}

View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.util;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* Includes partition path, filename and bloom filter for validation
*/
public class BloomFilterData implements Comparable<BloomFilterData> {
private final String partitionPath;
private final String filename;
private final ByteBuffer bloomFilter;
private BloomFilterData(
String partitionPath, String filename, ByteBuffer bloomFilter) {
this.partitionPath = partitionPath;
this.filename = filename;
this.bloomFilter = bloomFilter;
}
public static Builder builder() {
return new Builder();
}
@Override
public int compareTo(@NotNull BloomFilterData o) {
return this.toString().compareTo(o.toString());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BloomFilterData that = (BloomFilterData) o;
return partitionPath.equals(that.partitionPath) && filename.equals(that.filename)
&& bloomFilter.equals(that.bloomFilter);
}
@Override
public int hashCode() {
return Objects.hash(partitionPath, filename, bloomFilter);
}
@Override
public String toString() {
String bloomFilterString = new String(bloomFilter.array());
return "BloomFilterData{"
+ "partitionPath='" + partitionPath + '\''
+ ", filename='" + filename + '\''
+ ", bloomFilter="
+ (bloomFilterString.length() > 50 ? bloomFilterString.substring(0, 50) + "..." : bloomFilterString)
+ '}';
}
public static class Builder {
private String partitionPath;
private String filename;
private ByteBuffer bloomFilter;
public Builder setPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
return this;
}
public Builder setFilename(String filename) {
this.filename = filename;
return this;
}
public Builder setBloomFilter(ByteBuffer bloomFilter) {
this.bloomFilter = bloomFilter;
return this;
}
public BloomFilterData build() {
return new BloomFilterData(partitionPath, filename, bloomFilter);
}
}
}