|
|
|
|
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
|
|
|
|
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
|
|
|
|
import org.apache.hudi.common.model.HoodieRecord;
|
|
|
|
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
|
|
|
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
|
|
|
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
|
|
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
|
|
|
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
|
|
|
|
@@ -67,6 +68,7 @@ import java.io.IOException;
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.Comparator;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
@@ -78,6 +80,7 @@ import java.util.function.BiFunction;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
|
|
|
|
|
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString;
|
|
|
|
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;
|
|
|
|
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX;
|
|
|
|
|
@@ -86,6 +89,7 @@ import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_
|
|
|
|
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE;
|
|
|
|
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
|
|
|
|
|
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
|
|
|
|
|
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
|
|
|
|
|
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
|
|
|
|
|
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
|
|
|
|
|
|
|
|
|
|
@@ -379,15 +383,24 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
|
|
|
|
|
final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
|
|
|
|
|
HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism);
|
|
|
|
|
return deleteFileListRDD.flatMap(deleteFileInfoPair -> {
|
|
|
|
|
if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
|
|
|
|
|
return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), recordsGenerationParams.getDataMetaClient(), columnsToIndex, true).iterator();
|
|
|
|
|
}
|
|
|
|
|
return Collections.emptyListIterator();
|
|
|
|
|
});
|
|
|
|
|
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
|
|
|
|
|
|
|
|
|
|
List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
|
|
|
|
|
dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient));
|
|
|
|
|
|
|
|
|
|
if (columnsToIndex.isEmpty()) {
|
|
|
|
|
// In case there are no columns to index, bail
|
|
|
|
|
return engineContext.emptyHoodieData();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
|
|
|
|
|
return engineContext.parallelize(deleteFileList, parallelism)
|
|
|
|
|
.flatMap(deleteFileInfoPair -> {
|
|
|
|
|
if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
|
|
|
|
|
return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, true).iterator();
|
|
|
|
|
}
|
|
|
|
|
return Collections.emptyListIterator();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@@ -698,7 +711,15 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
Map<String, Map<String, Long>> partitionToAppendedFiles,
|
|
|
|
|
MetadataRecordsGenerationParams recordsGenerationParams) {
|
|
|
|
|
HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
|
|
|
|
|
final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
|
|
|
|
|
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
|
|
|
|
|
|
|
|
|
|
final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
|
|
|
|
|
dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient));
|
|
|
|
|
|
|
|
|
|
if (columnsToIndex.isEmpty()) {
|
|
|
|
|
// In case there are no columns to index, bail
|
|
|
|
|
return engineContext.emptyHoodieData();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final List<Pair<String, List<String>>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet()
|
|
|
|
|
.stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList());
|
|
|
|
|
@@ -712,7 +733,7 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
|
|
|
|
|
return deletedFileList.stream().flatMap(deletedFile -> {
|
|
|
|
|
final String filePathWithPartition = partitionName + "/" + deletedFile;
|
|
|
|
|
return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, true);
|
|
|
|
|
return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
|
|
|
|
|
}).iterator();
|
|
|
|
|
});
|
|
|
|
|
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
|
|
|
|
|
@@ -733,7 +754,7 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
return Stream.empty();
|
|
|
|
|
}
|
|
|
|
|
final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey();
|
|
|
|
|
return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, false);
|
|
|
|
|
return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
|
|
|
|
|
}).iterator();
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
@@ -838,55 +859,59 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
|
|
|
|
|
HoodieEngineContext engineContext,
|
|
|
|
|
MetadataRecordsGenerationParams recordsGenerationParams) {
|
|
|
|
|
try {
|
|
|
|
|
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
|
|
|
|
|
.flatMap(entry -> entry.stream()).collect(Collectors.toList());
|
|
|
|
|
return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new HoodieException("Failed to generate column stats records for metadata table ", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
|
|
|
|
|
.flatMap(Collection::stream).collect(Collectors.toList());
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create column stats from write status.
|
|
|
|
|
*
|
|
|
|
|
* @param engineContext - Engine context
|
|
|
|
|
* @param allWriteStats - Write status to convert
|
|
|
|
|
* @param recordsGenerationParams - Parameters for columns stats record generation
|
|
|
|
|
*/
|
|
|
|
|
public static HoodieData<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
|
|
|
|
|
List<HoodieWriteStat> allWriteStats,
|
|
|
|
|
MetadataRecordsGenerationParams recordsGenerationParams) {
|
|
|
|
|
if (allWriteStats.isEmpty()) {
|
|
|
|
|
return engineContext.emptyHoodieData();
|
|
|
|
|
}
|
|
|
|
|
final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
|
|
|
|
|
final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
|
|
|
|
|
HoodieData<HoodieWriteStat> allWriteStatsRDD = engineContext.parallelize(allWriteStats, parallelism);
|
|
|
|
|
return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Option<Schema> writerSchema =
|
|
|
|
|
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
|
|
|
|
|
.flatMap(writerSchemaStr ->
|
|
|
|
|
isNullOrEmpty(writerSchemaStr)
|
|
|
|
|
? Option.empty()
|
|
|
|
|
: Option.of(new Schema.Parser().parse(writerSchemaStr)));
|
|
|
|
|
|
|
|
|
|
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
|
|
|
|
|
HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
|
|
|
|
|
|
|
|
|
|
// NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields
|
|
|
|
|
Option<Schema> tableSchema = writerSchema.map(schema ->
|
|
|
|
|
tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
|
|
|
|
|
|
|
|
|
|
List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
|
|
|
|
|
tableConfig, tableSchema);
|
|
|
|
|
|
|
|
|
|
if (columnsToIndex.isEmpty()) {
|
|
|
|
|
// In case there are no columns to index, bail
|
|
|
|
|
return engineContext.emptyHoodieData();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
|
|
|
|
|
return engineContext.parallelize(allWriteStats, parallelism)
|
|
|
|
|
.flatMap(writeStat ->
|
|
|
|
|
translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new HoodieException("Failed to generate column stats records for metadata table", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the latest columns for the table for column stats indexing.
|
|
|
|
|
*
|
|
|
|
|
* @param datasetMetaClient - Data table meta client
|
|
|
|
|
* @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns
|
|
|
|
|
*/
|
|
|
|
|
private static List<String> getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
|
|
|
|
|
if (!isMetaIndexColumnStatsForAllColumns
|
|
|
|
|
|| datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) {
|
|
|
|
|
return Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
|
|
|
|
|
private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
|
|
|
|
|
HoodieTableConfig tableConfig,
|
|
|
|
|
Option<Schema> writerSchemaOpt) {
|
|
|
|
|
if (recordsGenParams.isAllColumnStatsIndexEnabled() && writerSchemaOpt.isPresent()) {
|
|
|
|
|
return writerSchemaOpt.get().getFields().stream()
|
|
|
|
|
.map(Schema.Field::name).collect(Collectors.toList());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient);
|
|
|
|
|
// consider nested fields as well. if column stats is enabled only for a subset of columns,
|
|
|
|
|
// directly use them instead of all columns from the latest table schema
|
|
|
|
|
try {
|
|
|
|
|
return schemaResolver.getTableAvroSchema().getFields().stream()
|
|
|
|
|
.map(entry -> entry.name()).collect(Collectors.toList());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new HoodieException("Failed to get latest columns for " + datasetMetaClient.getBasePath());
|
|
|
|
|
}
|
|
|
|
|
// In case no writer schema could be obtained we fall back to only index primary key
|
|
|
|
|
// columns
|
|
|
|
|
return Arrays.asList(tableConfig.getRecordKeyFields().get());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
|
|
|
|
|
@@ -914,7 +939,7 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values());
|
|
|
|
|
return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
|
|
|
|
|
}
|
|
|
|
|
return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false);
|
|
|
|
|
return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition,
|
|
|
|
|
@@ -1023,7 +1048,7 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
|
|
|
|
|
columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
|
|
|
|
|
|
|
|
|
|
if (!StringUtils.isNullOrEmpty(fieldVal)) {
|
|
|
|
|
if (!isNullOrEmpty(fieldVal)) {
|
|
|
|
|
// set the min value of the field
|
|
|
|
|
if (!columnStats.containsKey(MIN)) {
|
|
|
|
|
columnStats.put(MIN, fieldVal);
|
|
|
|
|
@@ -1043,4 +1068,17 @@ public class HoodieTableMetadataUtil {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
|
|
|
|
|
if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
|
|
|
|
|
return Option.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
|
|
|
|
|
try {
|
|
|
|
|
return Option.of(schemaResolver.getTableAvroSchema());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|