[HUDI-3825] Fixing Column Stats Index updating sequence (#5267)
This commit is contained in:
@@ -1557,11 +1557,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
|
||||
}
|
||||
|
||||
public String getColumnsEnabledForColumnStatsIndex() {
|
||||
public List<String> getColumnsEnabledForColumnStatsIndex() {
|
||||
return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
|
||||
}
|
||||
|
||||
public String getColumnsEnabledForBloomFilterIndex() {
|
||||
public List<String> getColumnsEnabledForBloomFilterIndex() {
|
||||
return getMetadataConfig().getColumnsEnabledForBloomFilterIndex();
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,6 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.SizeEstimator;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieAppendException;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -66,6 +65,7 @@ import org.apache.log4j.Logger;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -73,7 +73,6 @@ import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
|
||||
|
||||
@@ -348,15 +347,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
|
||||
|
||||
if (config.isMetadataColumnStatsIndexEnabled()) {
|
||||
final List<Schema.Field> fieldsToIndex;
|
||||
if (StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
|
||||
// If column stats index is enabled but columns not configured then we assume that all columns should be indexed
|
||||
// If column stats index is enabled but columns not configured then we assume that
|
||||
// all columns should be indexed
|
||||
if (config.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
|
||||
fieldsToIndex = writeSchemaWithMetaFields.getFields();
|
||||
} else {
|
||||
Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
|
||||
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
|
||||
Set<String> columnsToIndexSet = new HashSet<>(config.getColumnsEnabledForColumnStatsIndex());
|
||||
|
||||
fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
|
||||
.filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
|
||||
.filter(field -> columnsToIndexSet.contains(field.name()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
|
||||
|
||||
@@ -18,6 +18,11 @@
|
||||
|
||||
package org.apache.hudi.metadata;
|
||||
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
|
||||
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||
@@ -55,7 +60,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -66,12 +70,6 @@ import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIndexException;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -729,12 +727,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
|
||||
private MetadataRecordsGenerationParams getRecordsGenerationParams() {
|
||||
return new MetadataRecordsGenerationParams(
|
||||
dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
|
||||
dataMetaClient,
|
||||
enabledPartitionTypes,
|
||||
dataWriteConfig.getBloomFilterType(),
|
||||
dataWriteConfig.getMetadataBloomFilterIndexParallelism(),
|
||||
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
|
||||
dataWriteConfig.getColumnStatsIndexParallelism(),
|
||||
StringUtils.toList(dataWriteConfig.getColumnsEnabledForColumnStatsIndex()),
|
||||
StringUtils.toList(dataWriteConfig.getColumnsEnabledForBloomFilterIndex()));
|
||||
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
|
||||
dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1021,6 +1021,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
})
|
||||
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
|
||||
int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
|
||||
List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
|
||||
|
||||
if (partitionTypes.contains(MetadataPartitionType.FILES)) {
|
||||
@@ -1031,19 +1032,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
|
||||
}
|
||||
|
||||
if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
|
||||
if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
|
||||
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
|
||||
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
|
||||
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
|
||||
}
|
||||
|
||||
if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
|
||||
if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
|
||||
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
|
||||
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
|
||||
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
|
||||
}
|
||||
|
||||
LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata");
|
||||
LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
|
||||
|
||||
commit(createInstantTime, partitionToRecordsMap, false);
|
||||
}
|
||||
|
||||
@@ -18,6 +18,11 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
@@ -60,7 +65,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
|
||||
import org.apache.hudi.common.util.Functions;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -78,12 +82,6 @@ import org.apache.hudi.table.marker.WriteMarkers;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.apache.hudi.table.storage.HoodieLayoutFactory;
|
||||
import org.apache.hudi.table.storage.HoodieStorageLayout;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -888,7 +886,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
||||
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
|
||||
&& !config.isMetadataTableEnabled()
|
||||
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
|
||||
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
|
||||
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user