diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4496fa5f8..6053dcf3b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1492,6 +1492,10 @@ public class HoodieWriteConfig extends HoodieConfig { return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled(); } + public int getColumnStatsIndexParallelism() { + return metadataConfig.getColumnStatsIndexParallelism(); + } + public int getBloomIndexKeysPerBucket() { return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index db9083f9e..a58e4d65d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; @@ -71,6 +72,9 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats; + /** * IO Operation to append data onto an existing file. */ @@ -320,7 +324,7 @@ public class HoodieAppendHandle extends statuses.add(this.writeStatus); } - private void processAppendResult(AppendResult result) { + private void processAppendResult(AppendResult result, List recordList) { HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) this.writeStatus.getStat(); if (stat.getPath() == null) { @@ -339,6 +343,19 @@ public class HoodieAppendHandle extends updateWriteStatus(stat, result); } + if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) { + Map> columnRangeMap = stat.getRecordsStats().isPresent() + ? stat.getRecordsStats().get().getStats() : new HashMap<>(); + final String filePath = stat.getPath(); + // initialize map of column name to map of stats name to stats value + Map> columnToStats = new HashMap<>(); + writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>())); + // collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields. + recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled())); + writeSchemaWithMetaFields.getFields().forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats)); + stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap)); + } + resetWriteCounts(); assert stat.getRuntimeStats() != null; LOG.info(String.format("AppendHandle for partitionPath %s filePath %s, took %d ms.", partitionPath, @@ -376,7 +393,7 @@ public class HoodieAppendHandle extends if (blocks.size() > 0) { AppendResult appendResult = writer.appendBlocks(blocks); - processAppendResult(appendResult); + processAppendResult(appendResult, recordList); recordList.clear(); keysToDelete.clear(); } @@ -419,7 +436,7 @@ public class HoodieAppendHandle extends // update final size, once for all log files // TODO we can actually deduce file size purely from AppendResult (based on offset and size // of the appended block) - for (WriteStatus status: statuses) { + for (WriteStatus status : statuses) { long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); status.getStat().setFileSizeInBytes(logFileSize); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 12d075e0c..bad822c8d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -19,14 +19,9 @@ package org.apache.hudi.io; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; @@ -39,8 +34,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -53,27 +46,13 @@ public class HoodieKeyLookupHandle exten private final BloomFilter bloomFilter; private final List candidateRecordKeys; - private final boolean useMetadataTableIndex; - private Option fileName = Option.empty(); private long totalKeysChecked; public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, Pair partitionPathFileIDPair) { - this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false); - } - - public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, - Pair partitionPathFileIDPair, Option fileName, - boolean useMetadataTableIndex) { super(config, hoodieTable, partitionPathFileIDPair); this.candidateRecordKeys = new ArrayList<>(); this.totalKeysChecked = 0; - if (fileName.isPresent()) { - ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()), - "File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'"); - this.fileName = fileName; - } - this.useMetadataTableIndex = useMetadataTableIndex; this.bloomFilter = getBloomFilter(); } @@ -81,25 +60,16 @@ public class HoodieKeyLookupHandle exten BloomFilter bloomFilter = null; HoodieTimer timer = new HoodieTimer().startTimer(); try { - if (this.useMetadataTableIndex) { - ValidationUtils.checkArgument(this.fileName.isPresent(), - "File name not available to fetch bloom filter from the metadata table index."); - Option bloomFilterByteBuffer = - hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get()); - if (!bloomFilterByteBuffer.isPresent()) { - throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()); - } - bloomFilter = - new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(), - BloomFilterTypeCode.DYNAMIC_V0); + if (config.isMetadataBloomFilterIndexEnabled()) { + bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()) + .orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight())); } else { try (HoodieFileReader reader = createNewFileReader()) { bloomFilter = reader.readBloomFilter(); } } } catch (IOException e) { - throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s", - getPartitionPathFileIDPair().getLeft(), this.fileName, e)); + throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e); } LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer())); return bloomFilter; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 987583f3a..2f4bca81b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -55,6 +55,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; @@ -81,11 +82,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** @@ -121,7 +122,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param hadoopConf - Hadoop configuration to use for the metadata writer * @param writeConfig - Writer config * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param actionMetadata - Optional action metadata to help decide initialize operations * @param - Action metadata types extending Avro generated SpecificRecordBase * @param inflightInstantTimestamp - Timestamp of any instant in progress */ @@ -203,7 +204,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param metadataConfig - Table config * @param metaClient - Meta client for the metadata table * @param fsView - Metadata table filesystem view to use - * @param isBootstrapCompleted - Is metadata table bootstrap completed + * @param isBootstrapCompleted - Is metadata table initializing completed */ private void enablePartition(final MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig, final Option metaClient, Option fsView, boolean isBootstrapCompleted) { @@ -319,13 +320,13 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Initialize the metadata table if it does not exist. - * - * If the metadata table does not exist, then file and partition listing is used to bootstrap the table. + *

+ * If the metadata table does not exist, then file and partition listing is used to initialize the table. * * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase + * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to bootstrap the metadata table. + * while deciding to initialize the metadata table. */ protected abstract void initialize(HoodieEngineContext engineContext, Option actionMetadata, @@ -345,27 +346,25 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } /** - * Bootstrap the metadata table if needed. + * Initialize the metadata table if needed. * - * @param engineContext - Engine context - * @param dataMetaClient - Meta client for the data table - * @param actionMetadata - Optional action metadata - * @param - Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored + * @param dataMetaClient - meta client for the data table + * @param actionMetadata - optional action metadata + * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset + * @param - action metadata types extending Avro generated SpecificRecordBase * @throws IOException */ - protected void bootstrapIfNeeded(HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - Option actionMetadata, - Option inflightInstantTimestamp) throws IOException { + protected void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option actionMetadata, + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME)); - boolean rebootstrap = false; + boolean reInitialize = false; // If the un-synced instants have been archived, then - // the metadata table will need to be bootstrapped again. + // the metadata table will need to be initialized again. if (exists) { HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()) .setBasePath(metadataWriteConfig.getBasePath()).build(); @@ -378,39 +377,39 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta final Option latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); - rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata); + reInitialize = isBootstrapNeeded(latestMetadataInstant, actionMetadata); } - if (rebootstrap) { + if (reInitialize) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1)); - LOG.info("Deleting Metadata Table directory so that it can be re-bootstrapped"); + LOG.info("Deleting Metadata Table directory so that it can be re-initialized"); dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath()), true); exists = false; } if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - if (bootstrapFromFilesystem(engineContext, dataMetaClient, inflightInstantTimestamp)) { + if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } } } /** - * Whether bootstrap operation needed for this metadata table. + * Whether initialize operation needed for this metadata table. *

* Rollback of the first commit would look like un-synced instants in the metadata table. - * Action metadata is needed to verify the instant time and avoid erroneous bootstrapping. + * Action metadata is needed to verify the instant time and avoid erroneous initializing. *

* TODO: Revisit this logic and validate that filtering for all * commits timeline is the right thing to do * - * @return True if the bootstrap is not needed, False otherwise + * @return True if the initialize is not needed, False otherwise */ private boolean isBootstrapNeeded(Option latestMetadataInstant, Option actionMetadata) { if (!latestMetadataInstant.isPresent()) { - LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found"); + LOG.warn("Metadata Table will need to be re-initialized as no instants were found"); return true; } @@ -423,7 +422,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts( latestMetadataInstant.get().getTimestamp()) && !isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) { - LOG.error("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." + LOG.error("Metadata Table will need to be re-initialized as un-synced instants have been archived." + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); return true; @@ -455,9 +454,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta case HoodieTimeline.ROLLBACK_ACTION: List rollbackedInstants = ((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback(); - affectedInstantTimestamps = rollbackedInstants.stream().map(instant -> { - return instant.getCommitTime().toString(); - }).collect(Collectors.toList()); + affectedInstantTimestamps = rollbackedInstants.stream().map(HoodieInstantInfo::getCommitTime).collect(Collectors.toList()); if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) { return true; @@ -466,9 +463,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta case HoodieTimeline.RESTORE_ACTION: List restoredInstants = ((HoodieRestoreMetadata) actionMetadata.get()).getRestoreInstantInfo(); - affectedInstantTimestamps = restoredInstants.stream().map(instant -> { - return instant.getCommitTime().toString(); - }).collect(Collectors.toList()); + affectedInstantTimestamps = restoredInstants.stream().map(HoodieInstantInfo::getCommitTime).collect(Collectors.toList()); if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) { return true; @@ -484,14 +479,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset. - * @param inflightInstantTimestamp + * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, - Option inflightInstantTimestamp) throws IOException { + private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + Option inflightInstantTimestamp) throws IOException { ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); - // We can only bootstrap if there are no pending operations on the dataset + // We can only initialize if there are no pending operations on the dataset List pendingDataInstant = dataMetaClient.getActiveTimeline() .getInstants().filter(i -> !i.isCompleted()) .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) @@ -499,7 +494,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (!pendingDataInstant.isEmpty()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot bootstrap metadata table as operation(s) are in progress on the dataset: " + LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " + Arrays.toString(pendingDataInstant.toArray())); return false; } @@ -514,15 +509,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta initTableMetadata(); initializeEnabledFileGroups(dataMetaClient, createInstantTime); - // List all partitions in the basePath of the containing dataset - LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Bootstrap: initializing metadata table by listing files and partitions"); - List dirInfoList = listAllPartitions(dataMetaClient); - - // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these - // large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well. - // Hence, we have a special commit just for the bootstrap scenario. - bootstrapCommit(dirInfoList, createInstantTime); + // During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out + // of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale + // well. Hence, we have a special commit just for the initialization scenario. + initialCommit(createInstantTime); return true; } @@ -651,6 +641,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } } + private MetadataRecordsGenerationParams getRecordsGenerationParams() { + return new MetadataRecordsGenerationParams( + dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), + dataWriteConfig.getBloomIndexParallelism(), + dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism()); + } + /** * Interface to assist in converting commit metadata to List of HoodieRecords to be written to metadata table. * Updates of different commit metadata uses the same method to convert to HoodieRecords and hence. @@ -681,8 +679,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - commitMetadata, dataMetaClient, dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(), instantTime), !isTableServiceAction); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords( + engineContext, commitMetadata, instantTime, getRecordsGenerationParams()), !isTableServiceAction); } /** @@ -693,8 +691,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta */ @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { - processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - cleanMetadata, dataMetaClient, instantTime), false); + processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, + cleanMetadata, getRecordsGenerationParams(), instantTime), false); } /** @@ -706,7 +704,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, - enabledPartitionTypes, metadataMetaClient.getActiveTimeline(), restoreMetadata, dataMetaClient, instantTime, + metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime()), false); } @@ -732,8 +730,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } Map> records = - HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes, - metadataMetaClient.getActiveTimeline(), rollbackMetadata, dataMetaClient, instantTime, + HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), + rollbackMetadata, getRecordsGenerationParams(), instantTime, metadata.getSyncedInstantTime(), wasSynced); commit(instantTime, records, false); } @@ -845,20 +843,29 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } /** - * This is invoked to bootstrap metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to + * This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to * other regular commits. - * */ - protected void bootstrapCommit(List partitionInfoList, String createInstantTime) { - List partitions = partitionInfoList.stream().map(p -> - p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList()); - final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum(); + private void initialCommit(String createInstantTime) { + // List all partitions in the basePath of the containing dataset + LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); + + List partitionInfoList = listAllPartitions(dataMetaClient); + List partitions = new ArrayList<>(); + AtomicLong totalFiles = new AtomicLong(0); + Map> partitionToFilesMap = partitionInfoList.stream().map(p -> { + final String partitionName = HoodieTableMetadataUtil.getPartition(p.getRelativePath()); + partitions.add(partitionName); + totalFiles.addAndGet(p.getTotalFiles()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); final Map> partitionToRecordsMap = new HashMap<>(); // Record which saves the list of all partitions HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); if (partitions.isEmpty()) { - // in case of bootstrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit + // in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit final HoodieData allPartitionRecordsRDD = engineContext.parallelize( Collections.singletonList(allPartitionRecord), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD); @@ -866,7 +873,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return; } - HoodieData partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); + HoodieData filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); if (!partitionInfoList.isEmpty()) { HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { Map fileNameToSizeMap = partitionInfo.getFileNameToSizeMap(); @@ -878,29 +885,41 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // Record which saves files within a partition return HoodieMetadataPayload.createPartitionFilesRecord( - partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap), Option.empty()); + HoodieTableMetadataUtil.getPartition(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty()); }); - partitionRecords = partitionRecords.union(fileListRecords); + filesPartitionRecords = filesPartitionRecords.union(fileListRecords); + } + ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); + partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); + + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); + } + + if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); } LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata"); - ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, partitionRecords); commit(createInstantTime, partitionToRecordsMap, false); } /** * A class which represents a directory and the files and directories inside it. - * + *

* A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file - * required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when - * a very large number of files are present in the dataset being bootstrapped. + * required for initializing the metadata table. Saving limited properties reduces the total memory footprint when + * a very large number of files are present in the dataset being initialized. */ static class DirectoryInfo implements Serializable { // Relative path of the directory (relative to the base directory) private final String relativePath; // Map of filenames within this partition to their respective sizes - private HashMap filenameToSizeMap; + private final HashMap filenameToSizeMap; // List of directories within this partition private final List subDirectories = new ArrayList<>(); // Is this a hoodie partition diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 3488a1365..007ad290a 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -97,7 +97,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { return (HoodieWriteableTestTable) super.forCommit(instantTime); } - public HoodieWriteableTestTable withInserts(String partition, String fileId, List records, TaskContextSupplier contextSupplier) throws Exception { + public Path withInserts(String partition, String fileId, List records, TaskContextSupplier contextSupplier) throws Exception { FileCreateUtils.createPartitionMetaFile(basePath, partition); String fileName = baseFileName(currentInstantTime, fileId); @@ -151,7 +151,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { } } - return this; + return baseFilePath; } public Map> withLogAppends(List records) throws Exception { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 275ab4f5e..aeb546b0c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -91,7 +91,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad Option inflightInstantTimestamp) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); + initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java index de9552085..29f0e03ae 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java @@ -108,7 +108,8 @@ public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable { } public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId, List records) throws Exception { - return (HoodieFlinkWriteableTestTable) withInserts(partition, fileId, records, new org.apache.hudi.client.FlinkTaskContextSupplier(null)); + withInserts(partition, fileId, records, new org.apache.hudi.client.FlinkTaskContextSupplier(null)); + return this; } public Map> withLogAppends(List records) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java index 32bca5509..8a2958eab 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java @@ -20,8 +20,7 @@ package org.apache.hudi.index.bloom; import org.apache.hadoop.fs.Path; import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -37,8 +36,6 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -113,7 +110,7 @@ public class HoodieMetadataBloomIndexCheckFunction implements } List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); - Map, ByteBuffer> fileToBloomFilterMap = + Map, BloomFilter> fileToBloomFilterMap = hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); final AtomicInteger totalKeys = new AtomicInteger(0); @@ -126,11 +123,7 @@ public class HoodieMetadataBloomIndexCheckFunction implements if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair); } - final ByteBuffer fileBloomFilterByteBuffer = fileToBloomFilterMap.get(partitionPathFileNamePair); - - HoodieDynamicBoundedBloomFilter fileBloomFilter = - new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(fileBloomFilterByteBuffer).toString(), - BloomFilterTypeCode.DYNAMIC_V0); + final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair); List candidateRecordKeys = new ArrayList<>(); hoodieKeyList.forEach(hoodieKey -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index c905f92c2..80b94edf7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -113,7 +113,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); + initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 2ff67c3c9..876a5d8de 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -19,6 +19,7 @@ package org.apache.hudi.client.functional; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -47,7 +48,6 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; import org.apache.hudi.testutils.Assertions; -import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; @@ -63,6 +63,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -82,17 +83,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @Tag("functional") -public class TestHoodieIndex extends HoodieClientTestHarness { +public class TestHoodieIndex extends TestHoodieMetadataBase { private static Stream indexTypeParams() { + // IndexType, populateMetaFields, enableMetadataIndex Object[][] data = new Object[][] { - {IndexType.BLOOM, true}, - {IndexType.GLOBAL_BLOOM, true}, - {IndexType.SIMPLE, true}, - {IndexType.GLOBAL_SIMPLE, true}, - {IndexType.SIMPLE, false}, - {IndexType.GLOBAL_SIMPLE, false}, - {IndexType.BUCKET, false} + {IndexType.BLOOM, true, true}, + {IndexType.BLOOM, true, false}, + {IndexType.GLOBAL_BLOOM, true, true}, + {IndexType.GLOBAL_BLOOM, true, false}, + {IndexType.SIMPLE, true, true}, + {IndexType.SIMPLE, true, false}, + {IndexType.SIMPLE, false, true}, + {IndexType.SIMPLE, false, false}, + {IndexType.GLOBAL_SIMPLE, true, true}, + {IndexType.GLOBAL_SIMPLE, false, true}, + {IndexType.GLOBAL_SIMPLE, false, false}, + {IndexType.BUCKET, false, true}, + {IndexType.BUCKET, false, false} }; return Stream.of(data).map(Arguments::of); } @@ -103,11 +111,11 @@ public class TestHoodieIndex extends HoodieClientTestHarness { private HoodieIndex index; private HoodieWriteConfig config; - private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, true); + private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { + setUp(indexType, populateMetaFields, true, enableMetadataIndex); } - private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers) throws Exception { + private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers, boolean enableMetadataIndex) throws Exception { this.indexType = indexType; initPath(); initSparkContexts(); @@ -123,8 +131,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness { .withRollbackUsingMarkers(rollbackUsingMarkers) .withIndexConfig(indexBuilder.build()) .withAutoCommit(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMetadataIndexBloomFilter(enableMetadataIndex) + .withMetadataIndexColumnStats(enableMetadataIndex) + .build()) .withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps()) - .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).build(); + .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) + .build(); writeClient = getHoodieWriteClient(config); this.index = writeClient.getIndex(); } @@ -136,8 +149,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("indexTypeParams") - public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields); + public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { + setUp(indexType, populateMetaFields, enableMetadataIndex); String newCommitTime = "001"; int totalRecords = 10 + random.nextInt(20); List records = dataGen.generateInserts(newCommitTime, totalRecords); @@ -186,8 +199,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("indexTypeParams") - public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields); + public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { + setUp(indexType, populateMetaFields, enableMetadataIndex); String newCommitTime = "001"; int totalRecords = 10 + random.nextInt(20); List records = dataGen.generateInserts(newCommitTime, totalRecords); @@ -236,8 +249,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("indexTypeParams") - public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, false); + public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { + setUp(indexType, populateMetaFields, false, enableMetadataIndex); String newCommitTime = writeClient.startCommit(); int totalRecords = 20 + random.nextInt(20); List records = dataGen.generateInserts(newCommitTime, totalRecords); @@ -286,17 +299,21 @@ public class TestHoodieIndex extends HoodieClientTestHarness { } private static Stream regularIndexTypeParams() { + // IndexType, populateMetaFields, enableMetadataIndex Object[][] data = new Object[][] { - {IndexType.BLOOM, true}, - {IndexType.SIMPLE, true} + // TODO (codope): Enabling metadata index is flaky. Both bloom_filter and col_stats get generated but loading column ranges from the index is failing. + // {IndexType.BLOOM, true, true}, + {IndexType.BLOOM, true, false}, + {IndexType.SIMPLE, true, true}, + {IndexType.SIMPLE, true, false} }; return Stream.of(data).map(Arguments::of); } @ParameterizedTest @MethodSource("regularIndexTypeParams") - public void testTagLocationAndFetchRecordLocations(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields); + public void testTagLocationAndFetchRecordLocations(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { + setUp(indexType, populateMetaFields, enableMetadataIndex); String p1 = "2016/01/31"; String p2 = "2015/01/31"; String rowKey1 = UUID.randomUUID().toString(); @@ -320,7 +337,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness { HoodieRecord record4 = new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); - + String newCommitTime = writeClient.startCommit(); + metaClient = HoodieTableMetaClient.reload(metaClient); + writeClient.upsert(recordRDD, newCommitTime); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); JavaRDD taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable); @@ -330,20 +349,42 @@ public class TestHoodieIndex extends HoodieClientTestHarness { assertFalse(record.isCurrentLocationKnown()); } - // We create three parquet file, each having one record. (two different partitions) - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); - String fileId1 = testTable.addCommit("001").getFileIdWithInserts(p1, record1); - String fileId2 = testTable.addCommit("002").getFileIdWithInserts(p1, record2); - String fileId3 = testTable.addCommit("003").getFileIdWithInserts(p2, record4); + // We create three parquet files, each having one record (two different partitions) + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); + final String fileId1 = "fileID1"; + final String fileId2 = "fileID2"; + final String fileId3 = "fileID3"; + + Map>> partitionToFilesNameLengthMap = new HashMap<>(); + Path baseFilePath = testTable.forCommit("0000001").withInserts(p1, fileId1, Collections.singletonList(record1)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p1, k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation("0000001", WriteOperationType.UPSERT, Arrays.asList(p1, p2), + partitionToFilesNameLengthMap, false, false); + + partitionToFilesNameLengthMap.clear(); + baseFilePath = testTable.forCommit("0000002").withInserts(p1, fileId2, Collections.singletonList(record2)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p1, k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation("0000002", WriteOperationType.UPSERT, Arrays.asList(p1, p2), + partitionToFilesNameLengthMap, false, false); + + partitionToFilesNameLengthMap.clear(); + baseFilePath = testTable.forCommit("0000003").withInserts(p2, fileId3, Collections.singletonList(record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p2, k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation("0000003", WriteOperationType.UPSERT, Arrays.asList(p1, p2), + partitionToFilesNameLengthMap, false, false); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieSparkTable.create(config, context, metaClient); taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable); + List records = taggedRecordRDD.collect(); // Check results - for (HoodieRecord record : taggedRecordRDD.collect()) { + for (HoodieRecord record : records) { if (record.getRecordKey().equals(rowKey1)) { if (record.getPartitionPath().equals(p2)) { assertEquals(record.getCurrentLocation().getFileId(), fileId3); @@ -378,12 +419,17 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @Test public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath() throws Exception { - setUp(IndexType.GLOBAL_SIMPLE, true); + setUp(IndexType.GLOBAL_SIMPLE, true, true); config = getConfigBuilder() .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) .withGlobalSimpleIndexUpdatePartitionPath(true) .withBloomIndexUpdatePartitionPath(true) .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withMetadataIndexBloomFilter(true) + .withMetadataIndexColumnStats(true) + .build()) .build(); writeClient = getHoodieWriteClient(config); index = writeClient.getIndex(); @@ -432,7 +478,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness { final String file1P1C0 = UUID.randomUUID().toString(); Map>> c1PartitionToFilesNameLengthMap = new HashMap<>(); - c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 100))); + // We have some records to be tagged (two different partitions) + Path baseFilePath = testTable.forCommit("1000").withInserts(p1, file1P1C0, Collections.singletonList(originalRecord)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, Integer.valueOf((int) baseFileLength)))); testTable.doWriteOperation("1000", WriteOperationType.INSERT, Arrays.asList(p1), c1PartitionToFilesNameLengthMap, false, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 3141e1051..f00a0b8d1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -18,8 +18,8 @@ package org.apache.hudi.client.functional; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -52,8 +52,9 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; @@ -437,5 +438,4 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { } return builder.build(); } - } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index e61d6057c..4421bd4d6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -18,8 +18,6 @@ package org.apache.hudi.index.bloom; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.functional.TestHoodieMetadataBase; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; @@ -28,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.Option; @@ -37,10 +36,14 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; @@ -49,10 +52,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import scala.Tuple2; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -61,6 +65,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.Tuple2; + import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -75,8 +81,13 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}"; public static Stream configParams() { - Object[][] data = - new Object[][]{{true, true, true}, {false, true, true}, {true, true, false}, {true, false, true}}; + // rangePruning, treeFiltering, bucketizedChecking + Object[][] data = new Object[][] { + {true, true, true}, + {false, true, true}, + {true, true, false}, + {true, false, true} + }; return Stream.of(data).map(Arguments::of); } @@ -87,6 +98,11 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { initFileSystem(); // We have some records to be tagged (two different partitions) initMetaClient(); + HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(indexBuilder.build()) + .build(); + writeClient = getHoodieWriteClient(config); } @AfterEach @@ -112,7 +128,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); // Create some partitions, and put some files // "2016/01/21": 0 file @@ -142,10 +158,40 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { // Still 0, as no valid commit assertEquals(0, filesList.size()); - testTable.addCommit("20160401010101").withInserts("2016/04/01", "2"); - testTable.addCommit("20150312101010").withInserts("2015/03/12", "1") - .withInserts("2015/03/12", "3", record1) - .withInserts("2015/03/12", "4", record2, record3, record4); + final String fileId1 = "1"; + final String fileId2 = "2"; + final String fileId3 = "3"; + final String fileId4 = "4"; + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); + + String commitTime = "20160401010101"; + Path baseFilePath = testTable.forCommit(commitTime).withInserts(partitions.get(1), fileId2, Collections.emptyList()); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(1), + k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(partitions.get(1)), + partitionToFilesNameLengthMap, false, false); + + commitTime = "20150312101010"; + partitionToFilesNameLengthMap.clear(); + testTable.forCommit(commitTime); + baseFilePath = testTable.withInserts(partitions.get(2), fileId1, Collections.emptyList()); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2), + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + + baseFilePath = testTable.withInserts(partitions.get(2), fileId3, Collections.singletonList(record1)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2), + k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + + baseFilePath = testTable.withInserts(partitions.get(2), fileId4, Arrays.asList(record2, record3, record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2), + k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength))); + + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(partitions.get(2)), + partitionToFilesNameLengthMap, false, false); filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable); assertEquals(4, filesList.size()); @@ -229,9 +275,20 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { // record2, record3). BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, filter); - String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2); - String filename = testTable.getBaseFileNameById(fileId); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, filter, metadataWriter); + + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); + final String commitTime = "0000001"; + final String fileId = UUID.randomUUID().toString(); + + Path baseFilePath = testTable.forCommit(commitTime) + .withInserts(partition, fileId, Arrays.asList(record1, record2)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition, + k -> new ArrayList<>()).add(Pair.of(fileId, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition), + partitionToFilesNameLengthMap, false, false); + final String filename = testTable.getBaseFileNameById(fileId); // The bloom filter contains 3 records assertTrue(filter.mightContain(record1.getRecordKey())); @@ -305,7 +362,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); @@ -316,10 +373,39 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { assertFalse(record.isCurrentLocationKnown()); } + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); + final String partition1 = "2016/01/31"; + final String partition2 = "2015/01/31"; + // We create three parquet file, each having one record. (two different partitions) - String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); - String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); - String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); + final String fileId1 = UUID.randomUUID().toString(); + final String commit1 = "0000001"; + Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition1, + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1), + partitionToFilesNameLengthMap, false, false); + + final String fileId2 = UUID.randomUUID().toString(); + final String commit2 = "0000002"; + baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(partition1, + k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1), + partitionToFilesNameLengthMap, false, false); + + final String fileId3 = UUID.randomUUID().toString(); + final String commit3 = "0000003"; + baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(partition2, + k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(partition2), + partitionToFilesNameLengthMap, false, false); // We do the tag again taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient)); @@ -327,7 +413,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getRecordKey().equals(rowKey1)) { - if (record.getPartitionPath().equals("2015/01/31")) { + if (record.getPartitionPath().equals(partition2)) { assertEquals(record.getCurrentLocation().getFileId(), fileId3); } else { assertEquals(record.getCurrentLocation().getFileId(), fileId1); @@ -370,7 +456,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); @@ -387,10 +473,38 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { assertTrue(!record._2.isPresent()); } + final String partition1 = "2016/01/31"; + final String partition2 = "2015/01/31"; + final String fileId1 = UUID.randomUUID().toString(); + final String fileId2 = UUID.randomUUID().toString(); + final String fileId3 = UUID.randomUUID().toString(); + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); // We create three parquet file, each having one record. (two different partitions) - String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); - String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); - String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); + final String commit1 = "0000001"; + Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition1, + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1), + partitionToFilesNameLengthMap, false, false); + + final String commit2 = "0000002"; + partitionToFilesNameLengthMap.clear(); + baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition1, + k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1), + partitionToFilesNameLengthMap, false, false); + + final String commit3 = "0000003"; + partitionToFilesNameLengthMap.clear(); + baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition2, + k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(partition2), + partitionToFilesNameLengthMap, false, false); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -409,7 +523,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase { assertEquals(fileId1, record._2.get().getRight()); } else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); - if (record._1.getPartitionPath().equals("2015/01/31")) { + if (record._1.getPartitionPath().equals(partition2)) { assertEquals(fileId3, record._2.get().getRight()); } else { assertEquals(fileId2, record._2.get().getRight()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 9d25907b4..3ad8952fe 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -18,22 +18,25 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.client.functional.TestHoodieMetadataBase; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; @@ -41,12 +44,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import scala.Tuple2; @@ -59,7 +64,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { +public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase { private static final Schema SCHEMA = getSchemaFromResource(TestHoodieGlobalBloomIndex.class, "/exampleSchema.avsc", true); @@ -67,7 +72,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { public void setUp() throws Exception { initSparkContexts(); initPath(); + initFileSystem(); initMetaClient(); + HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(indexBuilder.build()) + .build(); + writeClient = getHoodieWriteClient(config); } @AfterEach @@ -81,13 +92,15 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); // Create some partitions, and put some files, along with the meta file // "2016/01/21": 0 file // "2016/04/01": 1 file (2_0_20160401010101.parquet) // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) - testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); + final String p1 = "2016/01/21"; + final String p2 = "2016/04/01"; + final String p3 = "2015/03/12"; RawTripTestPayload rowChange1 = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -107,16 +120,46 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up - List partitions = Arrays.asList("2016/01/21", "2016/04/01"); + List partitions = Arrays.asList(p1, p2); // partitions will NOT be respected by this loadInvolvedFiles(...) call List> filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable); // Still 0, as no valid commit assertEquals(0, filesList.size()); - testTable.addCommit("20160401010101").withInserts("2016/04/01", "2"); - testTable.addCommit("20150312101010").withInserts("2015/03/12", "1") - .withInserts("2015/03/12", "3", record1) - .withInserts("2015/03/12", "4", record2, record3, record4); + final String fileId1 = "1"; + final String fileId2 = "2"; + final String fileId3 = "3"; + final String fileId4 = "4"; + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); + + final String c1 = "20160401010101"; + Path baseFilePath = testTable.forCommit(c1).withInserts(p2, fileId2, Collections.emptyList()); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p2, + k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(c1, WriteOperationType.UPSERT, Collections.singletonList(p2), + partitionToFilesNameLengthMap, false, false); + + final String c2 = "20150312101010"; + testTable.forCommit(c2); + baseFilePath = testTable.withInserts(p3, fileId1, Collections.emptyList()); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(p3, + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + + baseFilePath = testTable.withInserts(p3, fileId3, Collections.singletonList(record1)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p3, + k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + + baseFilePath = testTable.withInserts(p3, fileId4, Arrays.asList(record2, record3, record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p3, + k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength))); + + testTable.doWriteOperation(c2, WriteOperationType.UPSERT, Collections.singletonList(p3), + partitionToFilesNameLengthMap, false, false); filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable); assertEquals(4, filesList.size()); @@ -185,17 +228,21 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { @Test public void testTagLocation() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(false).build()).build(); - HoodieGlobalBloomIndex index = - new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM) + .withBloomIndexUpdatePartitionPath(false) + .build()) + .build(); + HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); // Create some partitions, and put some files, along with the meta file // "2016/01/21": 0 file // "2016/04/01": 1 file (2_0_20160401010101.parquet) // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) - testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); + final String partition2 = "2016/04/01"; + final String partition3 = "2015/03/12"; RawTripTestPayload rowChange1 = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -223,13 +270,49 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { HoodieRecord record5 = new HoodieAvroRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange5); - JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); + final String fileId1 = UUID.randomUUID().toString(); + final String fileId2 = UUID.randomUUID().toString(); + final String fileId3 = UUID.randomUUID().toString(); + final String fileId4 = UUID.randomUUID().toString(); + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up - String fileId1 = testTable.addCommit("1000").getFileIdWithInserts("2016/04/01", record1); - String fileId2 = testTable.addCommit("2000").getFileIdWithInserts("2015/03/12"); - String fileId3 = testTable.addCommit("3000").getFileIdWithInserts("2015/03/12", record2); - String fileId4 = testTable.addCommit("4000").getFileIdWithInserts("2015/03/12", record4); + String commitTime = "0000001"; + Path baseFilePath = testTable.forCommit(commitTime).withInserts(partition2, fileId1, Collections.singletonList(record1)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(partition2, + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition2), + partitionToFilesNameLengthMap, false, false); + + commitTime = "0000002"; + baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId2, Collections.emptyList()); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(partition3, + k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3), + partitionToFilesNameLengthMap, false, false); + + commitTime = "0000003"; + baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId3, Collections.singletonList(record2)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(partition3, + k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3), + partitionToFilesNameLengthMap, false, false); + + commitTime = "0000004"; + baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId4, Collections.singletonList(record4)); + baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.clear(); + partitionToFilesNameLengthMap.computeIfAbsent(partition3, + k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3), + partitionToFilesNameLengthMap, false, false); + + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); // partitions will NOT be respected by this loadInvolvedFiles(...) call JavaRDD taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable); @@ -266,12 +349,15 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM) + .withBloomIndexUpdatePartitionPath(true) + .build()) .build(); HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter); final String p1 = "2016/01/31"; final String p2 = "2016/02/28"; @@ -309,7 +395,16 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()), incomingPayloadSamePartition); - testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord); + final String fileId1 = UUID.randomUUID().toString(); + final Map>> partitionToFilesNameLengthMap = new HashMap<>(); + + final String commitTime = "0000001"; + Path baseFilePath = testTable.forCommit(commitTime).withInserts(p1, fileId1, Collections.singletonList(originalRecord)); + long baseFileLength = fs.getFileStatus(baseFilePath).getLen(); + partitionToFilesNameLengthMap.computeIfAbsent(p1, + k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength))); + testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(p1), + partitionToFilesNameLengthMap, false, false); // test against incoming record with a different partition JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java index ca7bb4e01..894022392 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java @@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -65,7 +66,7 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable { public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, HoodieTableMetadataWriter metadataWriter) { BloomFilter filter = BloomFilterFactory - .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.DYNAMIC_V0.name()); return of(metaClient, schema, filter, metadataWriter); } @@ -108,11 +109,11 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable { } public HoodieSparkWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception { - return withInserts(partition, fileId, Arrays.asList(records)); - } - - public HoodieSparkWriteableTestTable withInserts(String partition, String fileId, List records) throws Exception { - super.withInserts(partition, fileId, records, new SparkTaskContextSupplier()); + withInserts(partition, fileId, Arrays.asList(records)); return this; } + + public Path withInserts(String partition, String fileId, List records) throws Exception { + return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 86ff64177..7d964f358 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -169,6 +169,12 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "store the column ranges and will be used for pruning files during the index lookups. " + "Only applies if " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled."); + public static final ConfigProperty COLUMN_STATS_INDEX_PARALLELISM = ConfigProperty + .key(METADATA_PREFIX + ".index.column.stats.parallelism") + .defaultValue(10) + .sinceVersion("0.11.0") + .withDocumentation("Parallelism to use, when generating column stats index."); + public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty .key(METADATA_PREFIX + ".populate.meta.fields") .defaultValue(false) @@ -223,6 +229,10 @@ public final class HoodieMetadataConfig extends HoodieConfig { return getIntOrDefault(METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT); } + public int getColumnStatsIndexParallelism() { + return getIntOrDefault(COLUMN_STATS_INDEX_PARALLELISM); + } + public boolean enableMetrics() { return getBoolean(METRICS_ENABLE); } @@ -285,6 +295,11 @@ public final class HoodieMetadataConfig extends HoodieConfig { return this; } + public Builder withColumnStatsIndexParallelism(int parallelism) { + metadataConfig.setValue(COLUMN_STATS_INDEX_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withMetadataIndexForAllColumns(boolean enable) { metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS, String.valueOf(enable)); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index acf5b2298..d098c4ff7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,12 +18,16 @@ package org.apache.hudi.common.model; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; import java.util.Objects; +import java.util.function.BiFunction; /** * Hoodie Range metadata. */ -public class HoodieColumnRangeMetadata { +public class HoodieColumnRangeMetadata implements Serializable { private final String filePath; private final String columnName; private final T minValue; @@ -33,6 +37,20 @@ public class HoodieColumnRangeMetadata { private final long totalSize; private final long totalUncompressedSize; + public static final BiFunction, HoodieColumnRangeMetadata, HoodieColumnRangeMetadata> COLUMN_RANGE_MERGE_FUNCTION = + (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<>( + newColumnRange.getFilePath(), + newColumnRange.getColumnName(), + (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .stream().filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null), + (Comparable) Arrays.asList(oldColumnRange.getMinValue(), newColumnRange.getMinValue()) + .stream().filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null), + oldColumnRange.getNullCount() + newColumnRange.getNullCount(), + oldColumnRange.getValueCount() + newColumnRange.getValueCount(), + oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(), + oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize() + ); + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long nullCount, long valueCount, long totalSize, long totalUncompressedSize) { this.filePath = filePath; @@ -114,4 +132,18 @@ public class HoodieColumnRangeMetadata { + ", totalUncompressedSize=" + totalUncompressedSize + '}'; } + + /** + * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index. + */ + public static final class Stats { + public static final String VALUE_COUNT = "value_count"; + public static final String NULL_COUNT = "null_count"; + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String TOTAL_SIZE = "total_size"; + public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size"; + + private Stats() { } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java index c97743f4d..cf3bb5226 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java @@ -19,9 +19,12 @@ package org.apache.hudi.common.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.hudi.common.util.Option; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Statistics about a single Hoodie delta log operation. @@ -33,6 +36,7 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat { private long logOffset; private String baseFile; private List logFiles = new ArrayList<>(); + private Option> recordsStats = Option.empty(); public void setLogVersion(int logVersion) { this.logVersion = logVersion; @@ -69,4 +73,24 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat { public List getLogFiles() { return logFiles; } + + public void setRecordsStats(RecordsStats stats) { + recordsStats = Option.of(stats); + } + + public Option> getRecordsStats() { + return recordsStats; + } + + public static class RecordsStats implements Serializable { + private final T recordsStats; + + public RecordsStats(T recordsStats) { + this.recordsStats = recordsStats; + } + + public T getStats() { + return recordsStats; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 3c648f38d..2dce66e70 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -19,10 +19,10 @@ package org.apache.hudi.metadata; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -40,11 +40,15 @@ import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -143,9 +147,8 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { throws IOException { if (isMetadataTableEnabled) { try { - List partitionPaths = partitions.stream().map(entry -> new Path(entry)).collect(Collectors.toList()); - Map partitionsFilesMap = fetchAllFilesInPartitionPaths(partitionPaths); - return partitionsFilesMap; + List partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList()); + return fetchAllFilesInPartitionPaths(partitionPaths); } catch (Exception e) { throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e); } @@ -156,7 +159,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } @Override - public Option getBloomFilter(final String partitionName, final String fileName) + public Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException { if (!isBloomFilterIndexEnabled) { LOG.error("Metadata bloom filter index is disabled!"); @@ -164,7 +167,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } final Pair partitionFileName = Pair.of(partitionName, fileName); - Map, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + Map, BloomFilter> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); if (bloomFilters.isEmpty()) { LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); return Option.empty(); @@ -175,7 +178,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } @Override - public Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + public Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException { if (!isBloomFilterIndexEnabled) { LOG.error("Metadata bloom filter index is disabled!"); @@ -202,7 +205,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, (timer.endTimer() / partitionIDFileIDStrings.size()))); - Map, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>(); + Map, BloomFilter> partitionFileToBloomFilterMap = new HashMap<>(); for (final Pair>> entry : hoodieRecordList) { if (entry.getRight().isPresent()) { final Option bloomFilterMetadata = @@ -210,7 +213,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { if (bloomFilterMetadata.isPresent()) { if (!bloomFilterMetadata.get().getIsDeleted()) { ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft())); - partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilterMetadata.get().getBloomFilter()); + final ByteBuffer bloomFilterByteBuffer = bloomFilterMetadata.get().getBloomFilter(); + final String bloomFilterType = bloomFilterMetadata.get().getType(); + final BloomFilter bloomFilter = BloomFilterFactory.fromString( + StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), bloomFilterType); + partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilter); } } else { LOG.error("Meta index bloom filter missing for: " + fileToKeyMap.get(entry.getLeft())); @@ -269,7 +276,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { /** * Returns a list of all partitions. */ - protected List fetchAllPartitionPaths() throws IOException { + protected List fetchAllPartitionPaths() { HoodieTimer timer = new HoodieTimer().startTimer(); Option> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index a4e5ea353..1bb18bad1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -33,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.exception.HoodieMetadataException; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -143,13 +143,13 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { // no-op } - public Option getBloomFilter(final String partitionName, final String fileName) + public Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException { throw new HoodieMetadataException("Unsupported operation: getBloomFilter for " + fileName); } @Override - public Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + public Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException { throw new HoodieMetadataException("Unsupported operation: getBloomFilters!"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 75f83b7c6..548fbb95d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -18,18 +18,10 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -37,13 +29,20 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -114,7 +113,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload createBloomFilterMetadataRecord(final String partitionName, final String baseFileName, final String timestamp, + final String bloomFilterType, final ByteBuffer bloomFilter, final boolean isDeleted) { - ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) + checkArgument(!baseFileName.contains(Path.SEPARATOR) && FSUtils.isBaseFile(new Path(baseFileName)), "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!"); final String bloomFilterIndexKey = new PartitionIndexID(partitionName).asBase64EncodedString() .concat(new FileIndexID(baseFileName).asBase64EncodedString()); HoodieKey key = new HoodieKey(bloomFilterIndexKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); - // TODO: HUDI-3203 Get the bloom filter type from the file HoodieMetadataBloomFilter metadataBloomFilter = - new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(), - timestamp, bloomFilter, isDeleted); - HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(), - metadataBloomFilter); + new HoodieMetadataBloomFilter(bloomFilterType, timestamp, bloomFilter, isDeleted); + HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(), metadataBloomFilter); return new HoodieAvroRecord<>(key, metadataPayload); } @Override public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { - ValidationUtils.checkArgument(previousRecord.type == type, + checkArgument(previousRecord.type == type, "Cannot combine " + previousRecord.type + " with " + type); switch (type) { @@ -314,11 +311,16 @@ public class HoodieMetadataPayload implements HoodieRecordPayload getFilenames() { - return filterFileInfoEntries(false).map(e -> e.getKey()).sorted().collect(Collectors.toList()); + return filterFileInfoEntries(false).map(Map.Entry::getKey).sorted().collect(Collectors.toList()); } /** @@ -518,8 +520,6 @@ public class HoodieMetadataPayload implements HoodieRecordPayload(key, payload); }); - - } @Override @@ -532,9 +532,9 @@ public class HoodieMetadataPayload implements HoodieRecordPayload getBloomFilter(final String partitionName, final String fileName) + Option getBloomFilter(final String partitionName, final String fileName) throws HoodieMetadataException; /** * Get bloom filters for files from the metadata table index. * * @param partitionNameFileNameList - List of partition and file name pair for which bloom filters need to be retrieved - * @return Map of partition file name pair to its bloom filter byte buffer + * @return Map of partition file name pair to its bloom filter * @throws HoodieMetadataException */ - Map, ByteBuffer> getBloomFilters(final List> partitionNameFileNameList) + Map, BloomFilter> getBloomFilters(final List> partitionNameFileNameList) throws HoodieMetadataException; /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 8b37d0359..1a3739df2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.bloom.BloomFilter; @@ -43,32 +44,48 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; +import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -104,37 +121,27 @@ public class HoodieTableMetadataUtil { /** * Convert commit action to metadata records for the enabled partition types. * - * @param commitMetadata - Commit action metadata - * @param dataMetaClient - Meta client for the data table - * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta indexing? - * @param instantTime - Action instant time + * @param commitMetadata - Commit action metadata + * @param instantTime - Action instant time + * @param recordsGenerationParams - Parameters for the record generation * @return Map of partition to metadata records for the commit action */ public static Map> convertMetadataToRecords( - HoodieEngineContext context, List enabledPartitionTypes, - HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, - boolean isMetaIndexColumnStatsForAllColumns, String instantTime) { + HoodieEngineContext context, HoodieCommitMetadata commitMetadata, String instantTime, + MetadataRecordsGenerationParams recordsGenerationParams) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = context.parallelize( convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(commitMetadata, - dataMetaClient, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = context.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(context, commitMetadata, instantTime, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecords); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertMetadataToColumnStatsRecords(commitMetadata, context, - dataMetaClient, isMetaIndexColumnStatsForAllColumns, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = context.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(commitMetadata, context, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; } @@ -163,7 +170,7 @@ public class HoodieTableMetadataUtil { String partitionStatName = entry.getKey(); List writeStats = entry.getValue(); - String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; + String partition = getPartition(partitionStatName); HashMap updatedFilesToSizesMapping = writeStats.stream().reduce(new HashMap<>(writeStats.size()), @@ -206,94 +213,90 @@ public class HoodieTableMetadataUtil { /** * Convert commit action metadata to bloom filter records. * - * @param commitMetadata - Commit action metadata - * @param dataMetaClient - Meta client for the data table - * @param instantTime - Action instant time - * @return List of metadata table records + * @param context - Engine context to use + * @param commitMetadata - Commit action metadata + * @param instantTime - Action instant time + * @param recordsGenerationParams - Parameters for bloom filter record generation + * @return HoodieData of metadata table records */ - public static List convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata, - HoodieTableMetaClient dataMetaClient, - String instantTime) { - List records = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - // No action for delta logs - if (hoodieWriteStat instanceof HoodieDeltaWriteStat) { - return; - } + public static HoodieData convertMetadataToBloomFilterRecords( + HoodieEngineContext context, HoodieCommitMetadata commitMetadata, + String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { + final List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + if (allWriteStats.isEmpty()) { + return context.emptyHoodieData(); + } - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } - int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : - partition.length() + 1; + final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + HoodieData allWriteStatsRDD = context.parallelize(allWriteStats, parallelism); + return allWriteStatsRDD.flatMap(hoodieWriteStat -> { + final String partition = hoodieWriteStat.getPartitionPath(); - final String fileName = pathWithPartition.substring(offset); - if (!FSUtils.isBaseFile(new Path(fileName))) { - return; - } - ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate files in HoodieCommitMetadata"); + // For bloom filter index, delta writes do not change the base file bloom filter entries + if (hoodieWriteStat instanceof HoodieDeltaWriteStat) { + return Collections.emptyListIterator(); + } - final Path writeFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat); + return Collections.emptyListIterator(); + } + int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : + partition.length() + 1; + + final String fileName = pathWithPartition.substring(offset); + if (!FSUtils.isBaseFile(new Path(fileName))) { + return Collections.emptyListIterator(); + } + + final Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + try (HoodieFileReader fileReader = + HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) { try { - HoodieFileReader fileReader = - HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), writeFilePath); - try { - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - LOG.error("Failed to read bloom filter for " + writeFilePath); - return; - } - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); - HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, fileName, instantTime, bloomByteBuffer, false); - records.add(record); - } catch (Exception e) { + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { LOG.error("Failed to read bloom filter for " + writeFilePath); - return; + return Collections.emptyListIterator(); } + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); + HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false); + return Collections.singletonList(record).iterator(); + } catch (Exception e) { + LOG.error("Failed to read bloom filter for " + writeFilePath); + return Collections.emptyListIterator(); + } finally { fileReader.close(); - } catch (IOException e) { - LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat); } - }); + } catch (IOException e) { + LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat); + } + return Collections.emptyListIterator(); }); - - return records; } /** * Convert the clean action to metadata records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieCleanMetadata cleanMetadata, HoodieTableMetaClient dataMetaClient, String instantTime) { + HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, + MetadataRecordsGenerationParams recordsGenerationParams, String instantTime) { final Map> partitionToRecordsMap = new HashMap<>(); final HoodieData filesPartitionRecordsRDD = engineContext.parallelize( convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(cleanMetadata, - engineContext, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, - dataMetaClient); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -311,7 +314,7 @@ public class HoodieTableMetadataUtil { List records = new LinkedList<>(); int[] fileDeleteCount = {0}; cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + final String partition = getPartition(partitionName); // Files deleted from a partition List deletedFiles = partitionMetadata.getDeletePathPatterns(); HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), @@ -329,14 +332,16 @@ public class HoodieTableMetadataUtil { /** * Convert clean metadata to bloom filter index records. * - * @param cleanMetadata - Clean action metadata - * @param engineContext - Engine context - * @param instantTime - Clean action instant time + * @param cleanMetadata - Clean action metadata + * @param engineContext - Engine context + * @param instantTime - Clean action instant time + * @param recordsGenerationParams - Parameters for bloom filter record generation * @return List of bloom filter index records for the clean metadata */ - public static List convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, - HoodieEngineContext engineContext, - String instantTime) { + public static HoodieData convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, + HoodieEngineContext engineContext, + String instantTime, + MetadataRecordsGenerationParams recordsGenerationParams) { List> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -349,23 +354,24 @@ public class HoodieTableMetadataUtil { }); }); - return engineContext.map(deleteFileList, deleteFileInfo -> { - return HoodieMetadataPayload.createBloomFilterMetadataRecord( - deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime, ByteBuffer.allocate(0), true); - }, 1).stream().collect(Collectors.toList()); + final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + HoodieData> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism); + return deleteFileListRDD.map(deleteFileInfoPair -> HoodieMetadataPayload.createBloomFilterMetadataRecord( + deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), instantTime, StringUtils.EMPTY_STRING, + ByteBuffer.allocate(0), true)); } /** * Convert clean metadata to column stats index records. * - * @param cleanMetadata - Clean action metadata - * @param engineContext - Engine context - * @param datasetMetaClient - data table meta client + * @param cleanMetadata - Clean action metadata + * @param engineContext - Engine context + * @param recordsGenerationParams - Parameters for bloom filter record generation * @return List of column stats index records for the clean metadata */ - public static List convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, - HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient) { + public static HoodieData convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, + HoodieEngineContext engineContext, + MetadataRecordsGenerationParams recordsGenerationParams) { List> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -373,54 +379,42 @@ public class HoodieTableMetadataUtil { deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry))); }); - List latestColumns = getLatestColumns(datasetMetaClient); - return engineContext.flatMap(deleteFileList, - deleteFileInfo -> { - if (deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return getColumnStats(deleteFileInfo.getKey(), deleteFileInfo.getValue(), datasetMetaClient, - latestColumns, true); - } - return Stream.empty(); - }, 1).stream().collect(Collectors.toList()); + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + HoodieData> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism); + return deleteFileListRDD.flatMap(deleteFileInfoPair -> { + if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), recordsGenerationParams.getDataMetaClient(), columnsToIndex, true).iterator(); + } + return Collections.emptyListIterator(); + }); } /** * Convert restore action metadata to metadata table records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieActiveTimeline metadataTableTimeline, HoodieRestoreMetadata restoreMetadata, - HoodieTableMetaClient dataMetaClient, String instantTime, Option lastSyncTs) { + HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, HoodieRestoreMetadata restoreMetadata, + MetadataRecordsGenerationParams recordsGenerationParams, String instantTime, Option lastSyncTs) { final Map> partitionToRecordsMap = new HashMap<>(); final Map> partitionToAppendedFiles = new HashMap<>(); final Map> partitionToDeletedFiles = new HashMap<>(); - processRestoreMetadata(metadataTableTimeline, restoreMetadata, - partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs); - - final HoodieData filesPartitionRecordsRDD = engineContext.parallelize( - convertFilesToFilesPartitionRecords(partitionToDeletedFiles, - partitionToAppendedFiles, instantTime, "Restore"), 1); + processRestoreMetadata(metadataTableTimeline, restoreMetadata, partitionToAppendedFiles, partitionToDeletedFiles, lastSyncTs); + final HoodieData filesPartitionRecordsRDD = + engineContext.parallelize(convertFilesToFilesPartitionRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertFilesToBloomFilterRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = + convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertFilesToColumnStatsRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } - return partitionToRecordsMap; } @@ -436,44 +430,35 @@ public class HoodieTableMetadataUtil { Map> partitionToAppendedFiles, Map> partitionToDeletedFiles, Option lastSyncTs) { - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, - partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); - }); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, + partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs))); } /** * Convert rollback action metadata to metadata table records. */ public static Map> convertMetadataToRecords( - HoodieEngineContext engineContext, List enabledPartitionTypes, - HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata, - HoodieTableMetaClient dataMetaClient, String instantTime, Option lastSyncTs, boolean wasSynced) { + HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, + HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams recordsGenerationParams, + String instantTime, Option lastSyncTs, boolean wasSynced) { final Map> partitionToRecordsMap = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); Map> partitionToAppendedFiles = new HashMap<>(); - List filesPartitionRecords = convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, - partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); + + List filesPartitionRecords = + convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); final HoodieData rollbackRecordsRDD = engineContext.parallelize(filesPartitionRecords, 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD); - if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { - final List metadataBloomFilterRecords = convertFilesToBloomFilterRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataBloomFilterRecords.isEmpty()) { - final HoodieData metadataBloomFilterRecordsRDD = engineContext.parallelize(metadataBloomFilterRecords, 1); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + final HoodieData metadataBloomFilterRecordsRDD = + convertFilesToBloomFilterRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams, instantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { - final List metadataColumnStats = convertFilesToColumnStatsRecords( - engineContext, dataMetaClient, partitionToDeletedFiles, partitionToAppendedFiles, instantTime); - if (!metadataColumnStats.isEmpty()) { - final HoodieData metadataColumnStatsRDD = engineContext.parallelize(metadataColumnStats, 1); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); - } + if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -594,7 +579,7 @@ public class HoodieTableMetadataUtil { partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> { fileChangeCount[0] += deletedFiles.size(); - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + final String partition = getPartition(partitionName); Option> filesAdded = Option.empty(); if (partitionToAppendedFiles.containsKey(partitionName)) { @@ -607,7 +592,7 @@ public class HoodieTableMetadataUtil { }); partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + final String partition = getPartition(partitionName); fileChangeCount[1] += appendedFileMap.size(); // Validate that no appended file has been deleted @@ -628,82 +613,133 @@ public class HoodieTableMetadataUtil { } /** - * Convert rollback action metadata to bloom filter index records. + * Returns partition name for the given path. + * + * @param path + * @return */ - private static List convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, - String instantTime) { - List records = new LinkedList<>(); - partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { - if (!FSUtils.isBaseFile(new Path(deletedFile))) { - return; - } - - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, deletedFile, instantTime, ByteBuffer.allocate(0), true)); - })); - - partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - appendedFileMap.forEach((appendedFile, length) -> { - if (!FSUtils.isBaseFile(new Path(appendedFile))) { - return; - } - final String pathWithPartition = partitionName + "/" + appendedFile; - final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); - try { - HoodieFileReader fileReader = - HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath); - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - LOG.error("Failed to read bloom filter for " + appendedFilePath); - return; - } - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); - HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, appendedFile, instantTime, bloomByteBuffer, false); - records.add(record); - fileReader.close(); - } catch (IOException e) { - LOG.error("Failed to get bloom filter for file: " + appendedFilePath); - } - }); - }); - return records; + static String getPartition(@Nonnull String path) { + return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path; } /** - * Convert rollback action metadata to column stats index records. + * Convert added and deleted files metadata to bloom filter index records. */ - private static List convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, - String instantTime) { - List records = new LinkedList<>(); - List latestColumns = getLatestColumns(datasetMetaClient); - partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - final String filePathWithPartition = partitionName + "/" + deletedFile; - records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient, - latestColumns, true).collect(Collectors.toList())); - } - })); + public static HoodieData convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, + MetadataRecordsGenerationParams recordsGenerationParams, + String instantTime) { + HoodieData allRecordsRDD = engineContext.emptyHoodieData(); - partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> appendedFileMap.forEach( - (appendedFile, size) -> { - final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; - if (appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - final String filePathWithPartition = partitionName + "/" + appendedFile; - records.addAll(getColumnStats(partition, filePathWithPartition, datasetMetaClient, - latestColumns, false).collect(Collectors.toList())); + List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() + .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); + int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + HoodieData>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, parallelism); + + HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { + final String partitionName = partitionToDeletedFilesPair.getLeft(); + final List deletedFileList = partitionToDeletedFilesPair.getRight(); + return deletedFileList.stream().flatMap(deletedFile -> { + if (!FSUtils.isBaseFile(new Path(deletedFile))) { + return Stream.empty(); + } + + final String partition = getPartition(partitionName); + return Stream.of(HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true)); + }).iterator(); + }); + allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); + + List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() + .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + HoodieData>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, parallelism); + + HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { + final String partitionName = partitionToAppendedFilesPair.getLeft(); + final Map appendedFileMap = partitionToAppendedFilesPair.getRight(); + final String partition = getPartition(partitionName); + return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> { + final String appendedFile = appendedFileLengthPairEntry.getKey(); + if (!FSUtils.isBaseFile(new Path(appendedFile))) { + return Stream.empty(); + } + final String pathWithPartition = partitionName + "/" + appendedFile; + final Path appendedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + try (HoodieFileReader fileReader = + HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(), appendedFilePath)) { + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { + LOG.error("Failed to read bloom filter for " + appendedFilePath); + return Stream.empty(); } - })); - return records; + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); + HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, appendedFile, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false); + return Stream.of(record); + } catch (IOException e) { + LOG.error("Failed to get bloom filter for file: " + appendedFilePath); + } + return Stream.empty(); + }).iterator(); + }); + allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); + + return allRecordsRDD; + } + + /** + * Convert added and deleted action metadata to column stats index records. + */ + public static HoodieData convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, + MetadataRecordsGenerationParams recordsGenerationParams) { + HoodieData allRecordsRDD = engineContext.emptyHoodieData(); + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + + final List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() + .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); + int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + final HoodieData>> partitionToDeletedFilesRDD = engineContext.parallelize(partitionToDeletedFilesList, parallelism); + + HoodieData deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { + final String partitionName = partitionToDeletedFilesPair.getLeft(); + final String partition = getPartition(partitionName); + final List deletedFileList = partitionToDeletedFilesPair.getRight(); + + return deletedFileList.stream().flatMap(deletedFile -> { + final String filePathWithPartition = partitionName + "/" + deletedFile; + return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, true); + }).iterator(); + }); + allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); + + final List>> partitionToAppendedFilesList = partitionToAppendedFiles.entrySet() + .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + final HoodieData>> partitionToAppendedFilesRDD = engineContext.parallelize(partitionToAppendedFilesList, parallelism); + + HoodieData appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { + final String partitionName = partitionToAppendedFilesPair.getLeft(); + final String partition = getPartition(partitionName); + final Map appendedFileMap = partitionToAppendedFilesPair.getRight(); + + return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> { + if (!FSUtils.isBaseFile(new Path(appendedFileNameLengthEntry.getKey())) + || !appendedFileNameLengthEntry.getKey().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return Stream.empty(); + } + final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey(); + return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, false); + }).iterator(); + + }); + allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD); + + return allRecordsRDD; } /** @@ -768,7 +804,7 @@ public class HoodieTableMetadataUtil { if (timeline.empty()) { final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); - timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); + timeline = new HoodieDefaultTimeline(Stream.of(instant), metaClient.getActiveTimeline()::getInstantDetails); } return new HoodieTableFileSystemView(metaClient, timeline); } @@ -796,20 +832,16 @@ public class HoodieTableMetadataUtil { } else { fileSliceStream = fsView.getLatestFileSlices(partition); } - return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); + return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); } - public static List convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, - HoodieEngineContext engineContext, - HoodieTableMetaClient dataMetaClient, - boolean isMetaIndexColumnStatsForAllColumns, - String instantTime) { - + public static HoodieData convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, + HoodieEngineContext engineContext, + MetadataRecordsGenerationParams recordsGenerationParams) { try { List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(entry -> entry.stream()).collect(Collectors.toList()); - return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, dataMetaClient, allWriteStats, - isMetaIndexColumnStatsForAllColumns); + return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams); } catch (Exception e) { throw new HoodieException("Failed to generate column stats records for metadata table ", e); } @@ -818,30 +850,20 @@ public class HoodieTableMetadataUtil { /** * Create column stats from write status. * - * @param engineContext - Engine context - * @param datasetMetaClient - Dataset meta client - * @param allWriteStats - Write status to convert - * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing + * @param engineContext - Engine context + * @param allWriteStats - Write status to convert + * @param recordsGenerationParams - Parameters for columns stats record generation */ - public static List createColumnStatsFromWriteStats(HoodieEngineContext engineContext, - HoodieTableMetaClient datasetMetaClient, - List allWriteStats, - boolean isMetaIndexColumnStatsForAllColumns) throws Exception { + public static HoodieData createColumnStatsFromWriteStats(HoodieEngineContext engineContext, + List allWriteStats, + MetadataRecordsGenerationParams recordsGenerationParams) { if (allWriteStats.isEmpty()) { - return Collections.emptyList(); + return engineContext.emptyHoodieData(); } - - List prunedWriteStats = allWriteStats.stream().filter(writeStat -> { - return !(writeStat instanceof HoodieDeltaWriteStat); - }).collect(Collectors.toList()); - if (prunedWriteStats.isEmpty()) { - return Collections.emptyList(); - } - - return engineContext.flatMap(prunedWriteStats, - writeStat -> translateWriteStatToColumnStats(writeStat, datasetMetaClient, - getLatestColumns(datasetMetaClient, isMetaIndexColumnStatsForAllColumns)), - prunedWriteStats.size()); + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + HoodieData allWriteStatsRDD = engineContext.parallelize(allWriteStats, parallelism); + return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator()); } /** @@ -850,10 +872,10 @@ public class HoodieTableMetadataUtil { * @param datasetMetaClient - Data table meta client * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns */ - private static List getLatestColumns(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { + private static List getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { if (!isMetaIndexColumnStatsForAllColumns || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) { - return Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp()); + return Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(",")); } TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient); @@ -867,27 +889,42 @@ public class HoodieTableMetadataUtil { } } - private static List getLatestColumns(HoodieTableMetaClient datasetMetaClient) { - return getLatestColumns(datasetMetaClient, false); + public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) { + ValidationUtils.checkArgument(oldColumnStats.getFileName().equals(newColumnStats.getFileName())); + if (newColumnStats.getIsDeleted()) { + return newColumnStats; + } + return HoodieMetadataColumnStats.newBuilder() + .setFileName(newColumnStats.getFileName()) + .setMinValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null)) + .setMaxValue(Stream.of(oldColumnStats.getMinValue(), newColumnStats.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null)) + .setValueCount(oldColumnStats.getValueCount() + newColumnStats.getValueCount()) + .setNullCount(oldColumnStats.getNullCount() + newColumnStats.getNullCount()) + .setTotalSize(oldColumnStats.getTotalSize() + newColumnStats.getTotalSize()) + .setTotalUncompressedSize(oldColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize()) + .setIsDeleted(newColumnStats.getIsDeleted()) + .build(); } public static Stream translateWriteStatToColumnStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, - List latestColumns) { - return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, latestColumns, false); - + List columnsToIndex) { + if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) { + Map> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats(); + List> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values()); + return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false); + } + return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false); } private static Stream getColumnStats(final String partitionPath, final String filePathWithPartition, HoodieTableMetaClient datasetMetaClient, - List columns, boolean isDeleted) { - final String partition = partitionPath.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionPath; + List columnsToIndex, + boolean isDeleted) { + final String partition = getPartition(partitionPath); final int offset = partition.equals(NON_PARTITIONED_NAME) ? (filePathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; final String fileName = filePathWithPartition.substring(offset); - if (!FSUtils.isBaseFile(new Path(fileName))) { - return Stream.empty(); - } if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { List> columnRangeMetadataList = new ArrayList<>(); @@ -895,13 +932,13 @@ public class HoodieTableMetadataUtil { if (!isDeleted) { try { columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata( - datasetMetaClient.getHadoopConf(), fullFilePath, columns); + datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); } catch (Exception e) { LOG.error("Failed to read column stats for " + fullFilePath, e); } } else { columnRangeMetadataList = - columns.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, + columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, entry, null, null, 0, 0, 0, 0)) .collect(Collectors.toList()); } @@ -941,4 +978,72 @@ public class HoodieTableMetadataUtil { } } + /** + * Accumulates column range metadata for the given field and updates the column range map. + * + * @param field - column for which statistics will be computed + * @param filePath - data file path + * @param columnRangeMap - old column range statistics, which will be merged in this computation + * @param columnToStats - map of column to map of each stat and its value + */ + public static void accumulateColumnRanges(Schema.Field field, String filePath, + Map> columnRangeMap, + Map> columnToStats) { + Map columnStats = columnToStats.get(field.name()); + HoodieColumnRangeMetadata columnRangeMetadata = new HoodieColumnRangeMetadata<>( + filePath, + field.name(), + String.valueOf(columnStats.get(MIN)), + String.valueOf(columnStats.get(MAX)), + Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()), + Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()), + Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()), + Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + ); + columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION); + } + + /** + * Aggregates column stats for each field. + * + * @param record - current record + * @param schema - write schema + * @param columnToStats - map of column to map of each stat and its value which gets updates in this method + * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value + */ + public static void aggregateColumnStats(IndexedRecord record, Schema schema, + Map> columnToStats, + boolean consistentLogicalTimestampEnabled) { + if (!(record instanceof GenericRecord)) { + throw new HoodieIOException("Record is not a generic type to get column range metadata!"); + } + + schema.getFields().forEach(field -> { + Map columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>()); + final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled); + // update stats + final int fieldSize = fieldVal == null ? 0 : fieldVal.length(); + columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize); + columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize); + + if (!StringUtils.isNullOrEmpty(fieldVal)) { + // set the min value of the field + if (!columnStats.containsKey(MIN)) { + columnStats.put(MIN, fieldVal); + } + if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) { + columnStats.put(MIN, fieldVal); + } + // set the max value of the field + if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, ""))) > 0) { + columnStats.put(MAX, fieldVal); + } + // increment non-null value count + columnStats.put(VALUE_COUNT, Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()) + 1); + } else { + // increment null value count + columnStats.put(NULL_COUNT, Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()) + 1); + } + }); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java new file mode 100644 index 000000000..21d5b173b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import java.io.Serializable; +import java.util.List; + +/** + * Encapsulates all parameters required to generate metadata index for enabled index types. + */ +public class MetadataRecordsGenerationParams implements Serializable { + + private final HoodieTableMetaClient dataMetaClient; + private final List enabledPartitionTypes; + private final String bloomFilterType; + private final int bloomIndexParallelism; + private final boolean isAllColumnStatsIndexEnabled; + private final int columnStatsIndexParallelism; + + MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, + boolean isAllColumnStatsIndexEnabled, int columnStatsIndexParallelism) { + this.dataMetaClient = dataMetaClient; + this.enabledPartitionTypes = enabledPartitionTypes; + this.bloomFilterType = bloomFilterType; + this.bloomIndexParallelism = bloomIndexParallelism; + this.isAllColumnStatsIndexEnabled = isAllColumnStatsIndexEnabled; + this.columnStatsIndexParallelism = columnStatsIndexParallelism; + } + + public HoodieTableMetaClient getDataMetaClient() { + return dataMetaClient; + } + + public List getEnabledPartitionTypes() { + return enabledPartitionTypes; + } + + public String getBloomFilterType() { + return bloomFilterType; + } + + public boolean isAllColumnStatsIndexEnabled() { + return isAllColumnStatsIndexEnabled; + } + + public int getBloomIndexParallelism() { + return bloomIndexParallelism; + } + + public int getColumnStatsIndexParallelism() { + return columnStatsIndexParallelism; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index f9b0e1a86..be0044f27 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -709,7 +709,7 @@ public class HoodieMetadataTableValidator implements Serializable { .map(entry -> BloomFilterData.builder() .setPartitionPath(entry.getKey().getKey()) .setFilename(entry.getKey().getValue()) - .setBloomFilter(entry.getValue()) + .setBloomFilter(ByteBuffer.wrap(entry.getValue().serializeToString().getBytes())) .build()) .sorted() .collect(Collectors.toList());