diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 876a5d8de..8c27e488d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -72,6 +72,7 @@ import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import scala.Tuple2; @@ -258,19 +259,23 @@ public class TestHoodieIndex extends TestHoodieMetadataBase { metaClient = HoodieTableMetaClient.reload(metaClient); // Insert 200 records - JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); - Assertions.assertNoWriteErrors(writeStatues.collect()); - List fileIds = writeStatues.map(WriteStatus::getFileId).collect(); - // commit this upsert - writeClient.commit(newCommitTime, writeStatues); + JavaRDD writeStatusesRDD = writeClient.upsert(writeRecords, newCommitTime); + // NOTE: This will trigger an actual write + List writeStatuses = writeStatusesRDD.collect(); + Assertions.assertNoWriteErrors(writeStatuses); + // Commit + writeClient.commit(newCommitTime, jsc.parallelize(writeStatuses)); + + List fileIds = writeStatuses.stream().map(WriteStatus::getFileId).collect(Collectors.toList()); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); // Now tagLocation for these records, hbaseIndex should tag them JavaRDD javaRDD = tagLocation(index, writeRecords, hoodieTable); - assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords); + assertEquals(totalRecords, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); // check tagged records are tagged with correct fileIds - assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0); + assertEquals(0, javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size()); List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect(); Map recordKeyToPartitionPathMap = new HashMap(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 077cc81ef..bc8a5c443 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -461,11 +461,9 @@ public class HoodieTableConfig extends HoodieConfig { } public Option getRecordKeyFields() { - if (contains(RECORDKEY_FIELDS)) { - return Option.of(Arrays.stream(getString(RECORDKEY_FIELDS).split(",")) - .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {})); - } - return Option.empty(); + String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD); + return Option.of(Arrays.stream(keyFieldsValue.split(",")) + .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {})); } public Option getPartitionFields() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java index 193bf5315..3d4bfcb6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java @@ -34,7 +34,7 @@ public final class Option implements Serializable { private static final long serialVersionUID = 0L; - private static final Option NULL_VAL = new Option<>(); + private static final Option EMPTY = new Option<>(); private final T val; @@ -67,8 +67,9 @@ public final class Option implements Serializable { this.val = val; } + @SuppressWarnings("unchecked") public static Option empty() { - return (Option) NULL_VAL; + return (Option) EMPTY; } public static Option of(T value) { @@ -108,6 +109,17 @@ public final class Option implements Serializable { } } + public Option flatMap(Function> mapper) { + if (null == mapper) { + throw new NullPointerException("mapper should not be null"); + } + if (!isPresent()) { + return empty(); + } else { + return Objects.requireNonNull(mapper.apply(val)); + } + } + /** * Returns this {@link Option} if not empty, otherwise evaluates the provided supplier * and returns the alternative diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index f0388cca1..4390e8766 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -67,6 +68,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -78,6 +80,7 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString; import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION; import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX; @@ -86,6 +89,7 @@ import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_ import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE; import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE; import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -379,15 +383,24 @@ public class HoodieTableMetadataUtil { deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry))); }); - final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); - final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - HoodieData> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism); - return deleteFileListRDD.flatMap(deleteFileInfoPair -> { - if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), recordsGenerationParams.getDataMetaClient(), columnsToIndex, true).iterator(); - } - return Collections.emptyListIterator(); - }); + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + + List columnsToIndex = getColumnsToIndex(recordsGenerationParams, + dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient)); + + if (columnsToIndex.isEmpty()) { + // In case there are no columns to index, bail + return engineContext.emptyHoodieData(); + } + + int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + return engineContext.parallelize(deleteFileList, parallelism) + .flatMap(deleteFileInfoPair -> { + if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, true).iterator(); + } + return Collections.emptyListIterator(); + }); } /** @@ -698,7 +711,15 @@ public class HoodieTableMetadataUtil { Map> partitionToAppendedFiles, MetadataRecordsGenerationParams recordsGenerationParams) { HoodieData allRecordsRDD = engineContext.emptyHoodieData(); - final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + + final List columnsToIndex = getColumnsToIndex(recordsGenerationParams, + dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient)); + + if (columnsToIndex.isEmpty()) { + // In case there are no columns to index, bail + return engineContext.emptyHoodieData(); + } final List>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet() .stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList()); @@ -712,7 +733,7 @@ public class HoodieTableMetadataUtil { return deletedFileList.stream().flatMap(deletedFile -> { final String filePathWithPartition = partitionName + "/" + deletedFile; - return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, true); + return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true); }).iterator(); }); allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD); @@ -733,7 +754,7 @@ public class HoodieTableMetadataUtil { return Stream.empty(); } final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey(); - return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, false); + return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false); }).iterator(); }); @@ -838,55 +859,59 @@ public class HoodieTableMetadataUtil { public static HoodieData convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, MetadataRecordsGenerationParams recordsGenerationParams) { - try { - List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(entry -> entry.stream()).collect(Collectors.toList()); - return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams); - } catch (Exception e) { - throw new HoodieException("Failed to generate column stats records for metadata table ", e); - } - } + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); - /** - * Create column stats from write status. - * - * @param engineContext - Engine context - * @param allWriteStats - Write status to convert - * @param recordsGenerationParams - Parameters for columns stats record generation - */ - public static HoodieData createColumnStatsFromWriteStats(HoodieEngineContext engineContext, - List allWriteStats, - MetadataRecordsGenerationParams recordsGenerationParams) { if (allWriteStats.isEmpty()) { return engineContext.emptyHoodieData(); } - final List columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled()); - final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); - HoodieData allWriteStatsRDD = engineContext.parallelize(allWriteStats, parallelism); - return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator()); + + try { + Option writerSchema = + Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) + .flatMap(writerSchemaStr -> + isNullOrEmpty(writerSchemaStr) + ? Option.empty() + : Option.of(new Schema.Parser().parse(writerSchemaStr))); + + HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); + HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig(); + + // NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields + Option tableSchema = writerSchema.map(schema -> + tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); + + List columnsToIndex = getColumnsToIndex(recordsGenerationParams, + tableConfig, tableSchema); + + if (columnsToIndex.isEmpty()) { + // In case there are no columns to index, bail + return engineContext.emptyHoodieData(); + } + + int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + return engineContext.parallelize(allWriteStats, parallelism) + .flatMap(writeStat -> + translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator()); + } catch (Exception e) { + throw new HoodieException("Failed to generate column stats records for metadata table", e); + } } /** * Get the latest columns for the table for column stats indexing. - * - * @param datasetMetaClient - Data table meta client - * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns */ - private static List getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { - if (!isMetaIndexColumnStatsForAllColumns - || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) { - return Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(",")); + private static List getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams, + HoodieTableConfig tableConfig, + Option writerSchemaOpt) { + if (recordsGenParams.isAllColumnStatsIndexEnabled() && writerSchemaOpt.isPresent()) { + return writerSchemaOpt.get().getFields().stream() + .map(Schema.Field::name).collect(Collectors.toList()); } - TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient); - // consider nested fields as well. if column stats is enabled only for a subset of columns, - // directly use them instead of all columns from the latest table schema - try { - return schemaResolver.getTableAvroSchema().getFields().stream() - .map(entry -> entry.name()).collect(Collectors.toList()); - } catch (Exception e) { - throw new HoodieException("Failed to get latest columns for " + datasetMetaClient.getBasePath()); - } + // In case no writer schema could be obtained we fall back to only index primary key + // columns + return Arrays.asList(tableConfig.getRecordKeyFields().get()); } public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) { @@ -914,7 +939,7 @@ public class HoodieTableMetadataUtil { List> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values()); return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false); } - return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false); + return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false); } private static Stream getColumnStats(final String partitionPath, final String filePathWithPartition, @@ -1023,7 +1048,7 @@ public class HoodieTableMetadataUtil { columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize); columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize); - if (!StringUtils.isNullOrEmpty(fieldVal)) { + if (!isNullOrEmpty(fieldVal)) { // set the min value of the field if (!columnStats.containsKey(MIN)) { columnStats.put(MIN, fieldVal); @@ -1043,4 +1068,17 @@ public class HoodieTableMetadataUtil { } }); } + + private static Option tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) { + if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { + return Option.empty(); + } + + TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient); + try { + return Option.of(schemaResolver.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e); + } + } }