From 2d864f75247fd2feff6509235c7df84ec8707738 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 10 Dec 2021 14:56:09 -0800 Subject: [PATCH] [HUDI-2814] Make Z-index more generic Column-Stats Index (#4106) --- .../hudi/config/HoodieClusteringConfig.java | 51 ++- .../org/apache/hudi/table/HoodieTable.java | 2 +- .../PartitionAwareClusteringPlanStrategy.java | 18 +- .../SparkSizeBasedClusteringPlanStrategy.java | 27 +- .../SparkSortAndSizeExecutionStrategy.java | 4 +- ...atialCurveOptimizationSortPartitioner.java | 67 ++- .../ColumnStatsIndexHelper.java} | 274 +++-------- .../hudi/sort/SpaceCurveSortingHelper.java | 260 +++++++++++ .../table/HoodieSparkCopyOnWriteTable.java | 17 +- .../org/apache/spark/OrderingIndexHelper.java | 430 ------------------ .../sql/hudi/execution/RangeSample.scala | 412 ++++++++--------- .../TestColumnStatsIndexHelper.java} | 6 +- .../common/table/HoodieTableMetaClient.java | 8 +- .../apache/hudi/common/util/BinaryUtil.java | 4 +- .../hudi/common/util/CollectionUtils.java | 34 +- .../apache/hudi/common/util/TypeUtils.java | 42 ++ .../hudi/common/util/TestBinaryUtil.java | 24 +- .../org/apache/hudi/HoodieFileIndex.scala | 32 +- .../spark/sql/hudi/DataSkippingUtils.scala | 22 +- .../apache/hudi/TestDataSkippingUtils.scala | 8 +- ...ation.scala => TestColumnStatsIndex.scala} | 182 ++------ ...=> TestSpaceCurveLayoutOptimization.scala} | 145 ++---- .../SpaceCurveOptimizeBenchMark.scala | 67 +-- 23 files changed, 892 insertions(+), 1244 deletions(-) rename hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/{zorder/ZOrderingIndexHelper.java => columnstats/ColumnStatsIndexHelper.java} (62%) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java delete mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java rename hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/{zorder/TestZOrderingIndexHelper.java => columnstats/TestColumnStatsIndexHelper.java} (86%) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java => hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java (98%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java rename hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java => hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java (81%) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/{TestZOrderLayoutOptimization.scala => TestColumnStatsIndex.scala} (55%) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/{TestTableLayoutOptimization.scala => TestSpaceCurveLayoutOptimization.scala} (50%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 676f2fff4..4f80b6608 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -24,13 +24,15 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.util.TypeUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; +import javax.annotation.Nonnull; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.Locale; +import java.util.Map; import java.util.Properties; /** @@ -520,11 +522,15 @@ public class HoodieClusteringConfig extends HoodieConfig { } /** - * strategy types for build z-ordering/space-filling curves. + * Type of a strategy for building Z-order/Hilbert space-filling curves. */ public enum BuildCurveStrategyType { DIRECT("direct"), SAMPLE("sample"); + + private static final Map VALUE_TO_ENUM_MAP = + TypeUtils.getValueToEnumMap(BuildCurveStrategyType.class, e -> e.value); + private final String value; BuildCurveStrategyType(String value) { @@ -532,42 +538,39 @@ public class HoodieClusteringConfig extends HoodieConfig { } public static BuildCurveStrategyType fromValue(String value) { - switch (value.toLowerCase(Locale.ROOT)) { - case "direct": - return DIRECT; - case "sample": - return SAMPLE; - default: - throw new HoodieException("Invalid value of Type."); + BuildCurveStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value); + if (enumValue == null) { + throw new HoodieException(String.format("Invalid value (%s)", value)); } + + return enumValue; } } /** - * strategy types for optimize layout for hudi data. + * Layout optimization strategies such as Z-order/Hilbert space-curves, etc */ - public enum BuildLayoutOptimizationStrategy { + public enum LayoutOptimizationStrategy { ZORDER("z-order"), HILBERT("hilbert"); + + private static final Map VALUE_TO_ENUM_MAP = + TypeUtils.getValueToEnumMap(LayoutOptimizationStrategy.class, e -> e.value); + private final String value; - BuildLayoutOptimizationStrategy(String value) { + LayoutOptimizationStrategy(String value) { this.value = value; } - public String toCustomString() { - return value; - } - - public static BuildLayoutOptimizationStrategy fromValue(String value) { - switch (value.toLowerCase(Locale.ROOT)) { - case "z-order": - return ZORDER; - case "hilbert": - return HILBERT; - default: - throw new HoodieException("Invalid value of Type."); + @Nonnull + public static LayoutOptimizationStrategy fromValue(String value) { + LayoutOptimizationStrategy enumValue = VALUE_TO_ENUM_MAP.get(value); + if (enumValue == null) { + throw new HoodieException(String.format("Invalid value (%s)", value)); } + + return enumValue; } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index ca34f8cc1..3c794ef04 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -246,7 +246,7 @@ public abstract class HoodieTable implem public abstract HoodieWriteMetadata insertOverwriteTable(HoodieEngineContext context, String instantTime, I records); /** - * Updates Metadata Indexes (like Z-Index) + * Updates Metadata Indexes (like Column Stats index) * TODO rebase onto metadata table (post RFC-27) * * @param context instance of {@link HoodieEngineContext} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index 16954e5bb..4d916362f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -74,13 +74,17 @@ public abstract class PartitionAwareClusteringPlanStrategy clusteringGroups = getEngineContext().flatMap(partitionPaths, - partitionPath -> { - List fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList()); - return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups()); - }, - partitionPaths.size()) - .stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList()); + List clusteringGroups = getEngineContext() + .flatMap( + partitionPaths, + partitionPath -> { + List fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList()); + return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups()); + }, + partitionPaths.size()) + .stream() + .limit(getWriteConfig().getClusteringMaxNumGroups()) + .collect(Collectors.toList()); if (clusteringGroups.isEmpty()) { LOG.info("No data available to cluster"); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java index c89ff2f2b..7295118ca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java @@ -68,13 +68,13 @@ public class SparkSizeBasedClusteringPlanStrategy buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + HoodieWriteConfig writeConfig = getWriteConfig(); + List, Integer>> fileSliceGroups = new ArrayList<>(); List currentGroup = new ArrayList<>(); long totalSizeSoFar = 0; - HoodieWriteConfig writeConfig = getWriteConfig(); + for (FileSlice currentSlice : fileSlices) { - // assume each filegroup size is ~= parquet.max.file.size - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); // check if max size is reached and create new group, if needed. if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); @@ -84,13 +84,13 @@ public class SparkSizeBasedClusteringPlanStrategy(); totalSizeSoFar = 0; } + + // Add to the current file-group currentGroup.add(currentSlice); - // totalSizeSoFar could be 0 when new group was created in the previous conditional block. - // reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present. - if (totalSizeSoFar == 0) { - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); - } + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); } + if (!currentGroup.isEmpty()) { int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " @@ -98,11 +98,12 @@ public class SparkSizeBasedClusteringPlanStrategy HoodieClusteringGroup.newBuilder() - .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) - .setNumOutputFileGroups(fileSliceGroup.getRight()) - .setMetrics(buildMetrics(fileSliceGroup.getLeft())) - .build()); + return fileSliceGroups.stream().map(fileSliceGroup -> + HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index 3f89745ab..22d530021 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -64,7 +64,7 @@ public class SparkSortAndSizeExecutionStrategy> props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); - return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); + return (JavaRDD) SparkBulkInsertHelper.newInstance() + .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index 51526fc4d..896a2aaab 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -18,6 +18,8 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -27,17 +29,20 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sort.SpaceCurveSortingHelper; import org.apache.hudi.table.BulkInsertPartitioner; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.spark.OrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; + /** * A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition. * support z-curve optimization, hilbert will come soon. @@ -74,25 +79,47 @@ public class RDDSpatialCurveOptimizationSortPartitioner prepareGenericRecord(JavaRDD> inputRecords, final int numOutputGroups, final Schema schema) { SerializableSchema serializableSchema = new SerializableSchema(schema); JavaRDD genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); - Dataset originDF = AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession()); - Dataset zDataFrame; + Dataset originDF = + AvroConversionUtils.createDataFrame( + genericRecordJavaRDD.rdd(), + schema.toString(), + sparkEngineContext.getSqlContext().sparkSession() + ); - switch (config.getLayoutOptimizationCurveBuildMethod()) { - case DIRECT: - zDataFrame = OrderingIndexHelper - .createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); - break; - case SAMPLE: - zDataFrame = OrderingIndexHelper - .createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); - break; - default: - throw new HoodieException("Not a valid build curve method for doWriteOperation: "); - } - return HoodieSparkUtils.createRdd(zDataFrame, schema.getName(), + Dataset sortedDF = reorder(originDF, numOutputGroups); + + return HoodieSparkUtils.createRdd(sortedDF, schema.getName(), schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD(); } + private Dataset reorder(Dataset originDF, int numOutputGroups) { + String orderedColumnsListConfig = config.getClusteringSortColumns(); + + if (isNullOrEmpty(orderedColumnsListConfig) || numOutputGroups <= 0) { + // No-op + return originDF; + } + + List orderedCols = + Arrays.stream(orderedColumnsListConfig.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = + HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(config.getLayoutOptimizationStrategy()); + + HoodieClusteringConfig.BuildCurveStrategyType curveBuildStrategyType = config.getLayoutOptimizationCurveBuildMethod(); + + switch (curveBuildStrategyType) { + case DIRECT: + return SpaceCurveSortingHelper.orderDataFrameByMappingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups); + case SAMPLE: + return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups); + default: + throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveBuildStrategyType)); + } + } + @Override public boolean arePartitionRecordsSorted() { return true; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java similarity index 62% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 934d1b9c8..d92bac4d8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,21 +15,18 @@ * limitations under the License. */ -package org.apache.hudi.index.zorder; +package org.apache.hudi.index.columnstats; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.optimize.ZOrderingUtil; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.io.api.Binary; @@ -41,10 +37,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Row$; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.hudi.execution.RangeSampleSort$; -import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.BinaryType$; import org.apache.spark.sql.types.BooleanType; import org.apache.spark.sql.types.ByteType; import org.apache.spark.sql.types.DataType; @@ -64,10 +57,9 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.util.SerializableConfiguration; +import scala.collection.JavaConversions; import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -79,13 +71,11 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import scala.collection.JavaConversions; - import static org.apache.hudi.util.DataTypeUtils.areCompatible; -public class ZOrderingIndexHelper { +public class ColumnStatsIndexHelper { - private static final Logger LOG = LogManager.getLogger(ZOrderingIndexHelper.class); + private static final Logger LOG = LogManager.getLogger(ColumnStatsIndexHelper.class); private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; @@ -108,106 +98,10 @@ public class ZOrderingIndexHelper { } /** - * Create z-order DataFrame directly - * first, map all base type data to byte[8], then create z-order DataFrame - * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte - * this method is more effective than createZIndexDataFrameBySample - * - * @param df a spark DataFrame holds parquet files to be read. - * @param zCols z-sort cols - * @param fileNum spark partition num - * @return a dataFrame sorted by z-order. - */ - public static Dataset createZIndexedDataFrameByMapValue(Dataset df, List zCols, int fileNum) { - Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); - int fieldNum = df.schema().fields().length; - List checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); - if (zCols.size() != checkCols.size()) { - return df; - } - // only one col to sort, no need to use z-order - if (zCols.size() == 1) { - return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0))); - } - Map fieldMap = zCols - .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); - // z-sort - JavaRDD sortedRdd = df.toJavaRDD().map(row -> { - List zBytesList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); - } else if (dataType instanceof DoubleType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); - } else if (dataType instanceof FloatType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); - } else if (dataType instanceof StringType) { - return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); - } else if (dataType instanceof DateType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); - } else if (dataType instanceof TimestampType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); - } else if (dataType instanceof ByteType) { - return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); - } else if (dataType instanceof ShortType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); - } else if (dataType instanceof DecimalType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return ZOrderingUtil.intTo8Byte(value ? 1 : 0); - } else if (dataType instanceof BinaryType) { - return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); - byte[][] zBytes = new byte[zBytesList.size()][]; - for (int i = 0; i < zBytesList.size(); i++) { - zBytes[i] = zBytesList.get(i); - } - List zVaules = new ArrayList<>(); - zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); - }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); - - // create new StructType - List newFields = new ArrayList<>(); - newFields.addAll(Arrays.asList(df.schema().fields())); - newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty())); - - // create new DataFrame - return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex"); - } - - public static Dataset createZIndexedDataFrameByMapValue(Dataset df, String zCols, int fileNum) { - if (zCols == null || zCols.isEmpty() || fileNum <= 0) { - return df; - } - return createZIndexedDataFrameByMapValue(df, - Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); - } - - public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) { - return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, - HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString()); - } - - public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) { - if (zCols == null || zCols.isEmpty() || fileNum <= 0) { - return df; - } - return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); - } - - /** - * Parse min/max statistics from Parquet footers for provided columns and composes Z-index - * table in the following format with 3 statistics denominated for each Z-ordered column. - * For ex, if original table contained Z-ordered column {@code A}: + * Parse min/max statistics from Parquet footers for provided columns and composes column-stats + * index table in the following format with 3 statistics denominated for each + * linear/Z-curve/Hilbert-curve-ordered column. For ex, if original table contained + * column {@code A}: * *
    * +---------------------------+------------+------------+-------------+
@@ -225,15 +119,15 @@ public class ZOrderingIndexHelper {
    * @VisibleForTesting
    *
    * @param sparkSession encompassing Spark session
-   * @param baseFilesPaths list of base-files paths to be sourced for Z-index
-   * @param zorderedColumnSchemas target Z-ordered columns
+   * @param baseFilesPaths list of base-files paths to be sourced for column-stats index
+   * @param orderedColumnSchemas target ordered columns
    * @return Spark's {@link Dataset} holding an index table
    */
   @Nonnull
-  public static Dataset buildZIndexTableFor(
+  public static Dataset buildColumnStatsTableFor(
       @Nonnull SparkSession sparkSession,
       @Nonnull List baseFilesPaths,
-      @Nonnull List zorderedColumnSchemas
+      @Nonnull List orderedColumnSchemas
   ) {
     SparkContext sc = sparkSession.sparkContext();
     JavaSparkContext jsc = new JavaSparkContext(sc);
@@ -252,12 +146,12 @@ public class ZOrderingIndexHelper {
                 return StreamSupport.stream(iterable.spliterator(), false)
                     .flatMap(path ->
                         utils.readRangeFromParquetMetadata(
-                            serializableConfiguration.value(),
-                            new Path(path),
-                            zorderedColumnSchemas.stream()
-                                .map(StructField::name)
-                                .collect(Collectors.toList())
-                        )
+                                serializableConfiguration.value(),
+                                new Path(path),
+                                orderedColumnSchemas.stream()
+                                    .map(StructField::name)
+                                    .collect(Collectors.toList())
+                            )
                             .stream()
                     )
                     .iterator();
@@ -288,7 +182,7 @@ public class ZOrderingIndexHelper {
               indexRow.add(filePath);
 
               // For each column
-              zorderedColumnSchemas.forEach(colSchema -> {
+              orderedColumnSchemas.forEach(colSchema -> {
                 String colName = colSchema.name();
 
                 HoodieColumnRangeMetadata colMetadata =
@@ -313,66 +207,67 @@ public class ZOrderingIndexHelper {
             })
             .filter(Objects::nonNull);
 
-    StructType indexSchema = composeIndexSchema(zorderedColumnSchemas);
+    StructType indexSchema = composeIndexSchema(orderedColumnSchemas);
 
     return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
   }
 
   /**
    * 

- * Updates state of the Z-index by: + * Updates state of the column-stats index by: *

    - *
  1. Updating Z-index with statistics for {@code sourceBaseFiles}, collecting corresponding - * column statistics from Parquet footers
  2. - *
  3. Merging newly built Z-index table with the most recent one (if present and not preempted)
  4. + *
  5. Updating column-stats index with statistics for {@code sourceBaseFiles}, + * collecting corresponding column statistics from Parquet footers
  6. + *
  7. Merging newly built column-stats index table with the most recent one (if present + * and not preempted)
  8. *
  9. Cleans up any residual index tables, that weren't cleaned up before
  10. *
* * @param sparkSession encompassing Spark session * @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema * @param sourceBaseFiles list of base-files to be indexed - * @param zorderedCols target Z-ordered columns - * @param zindexFolderPath Z-index folder path + * @param orderedCols target ordered columns + * @param indexFolderPath col-stats index folder path * @param commitTime current operation commit instant * @param completedCommits all previously completed commit instants */ - public static void updateZIndexFor( + public static void updateColumnStatsIndexFor( @Nonnull SparkSession sparkSession, @Nonnull StructType sourceTableSchema, @Nonnull List sourceBaseFiles, - @Nonnull List zorderedCols, - @Nonnull String zindexFolderPath, + @Nonnull List orderedCols, + @Nonnull String indexFolderPath, @Nonnull String commitTime, @Nonnull List completedCommits ) { - FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration()); + FileSystem fs = FSUtils.getFs(indexFolderPath, sparkSession.sparkContext().hadoopConfiguration()); - // Compose new Z-index table for the given source base files - Dataset newZIndexDf = - buildZIndexTableFor( + // Compose new col-stats index table for the given source base files + Dataset newColStatsIndexDf = + buildColumnStatsTableFor( sparkSession, sourceBaseFiles, - zorderedCols.stream() + orderedCols.stream() .map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)]) .collect(Collectors.toList()) ); try { // - // Z-Index has the following folder structure: + // Column Stats Index has the following folder structure: // // .hoodie/ - // ├── .zindex/ + // ├── .colstatsindex/ // │ ├── / // │ │ ├── .parquet // │ │ └── ... // // If index is currently empty (no persisted tables), we simply create one // using clustering operation's commit instance as it's name - Path newIndexTablePath = new Path(zindexFolderPath, commitTime); + Path newIndexTablePath = new Path(indexFolderPath, commitTime); - if (!fs.exists(new Path(zindexFolderPath))) { - newZIndexDf.repartition(1) + if (!fs.exists(new Path(indexFolderPath))) { + newColStatsIndexDf.repartition(1) .write() .format("parquet") .mode("overwrite") @@ -383,8 +278,8 @@ public class ZOrderingIndexHelper { // Filter in all index tables (w/in {@code .zindex} folder) List allIndexTables = Arrays.stream( - fs.listStatus(new Path(zindexFolderPath)) - ) + fs.listStatus(new Path(indexFolderPath)) + ) .filter(FileStatus::isDirectory) .map(f -> f.getPath().getName()) .collect(Collectors.toList()); @@ -402,23 +297,23 @@ public class ZOrderingIndexHelper { .filter(f -> !completedCommits.contains(f)) .collect(Collectors.toList()); - Dataset finalZIndexDf; - - // Before writing out new version of the Z-index table we need to merge it + Dataset finalColStatsIndexDf; + + // Before writing out new version of the col-stats-index table we need to merge it // with the most recent one that were successfully persisted previously if (validIndexTables.isEmpty()) { - finalZIndexDf = newZIndexDf; + finalColStatsIndexDf = newColStatsIndexDf; } else { // NOTE: That Parquet schema might deviate from the original table schema (for ex, // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it // prior to merging, since merging might fail otherwise due to schemas incompatibility - finalZIndexDf = + finalColStatsIndexDf = tryMergeMostRecentIndexTableInto( sparkSession, - newZIndexDf, - // Load current most recent Z-index table + newColStatsIndexDf, + // Load current most recent col-stats-index table sparkSession.read().load( - new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() + new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() ) ); @@ -426,28 +321,28 @@ public class ZOrderingIndexHelper { tablesToCleanup.addAll(validIndexTables); } - // Persist new Z-index table - finalZIndexDf - .repartition(1) - .write() - .format("parquet") - .save(newIndexTablePath.toString()); + // Persist new col-stats-index table + finalColStatsIndexDf + .repartition(1) + .write() + .format("parquet") + .save(newIndexTablePath.toString()); - // Clean up residual Z-index tables that have might have been dangling since + // Clean up residual col-stats-index tables that have might have been dangling since // previous iterations (due to intermittent failures during previous clean up) tablesToCleanup.forEach(f -> { try { - fs.delete(new Path(zindexFolderPath, f), true); + fs.delete(new Path(indexFolderPath, f), true); } catch (IOException ie) { // NOTE: Exception is deliberately swallowed to not affect overall clustering operation, - // since failing Z-index table will be attempted to be cleaned up upon subsequent + // since failing col-stats-index table will be attempted to be cleaned up upon subsequent // clustering iteration - LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie); + LOG.warn(String.format("Failed to cleanup residual col-stats-index table: %s", f), ie); } }); } catch (IOException e) { - LOG.error("Failed to build new Z-index table", e); - throw new HoodieException("Failed to build new Z-index table", e); + LOG.error("Failed to build new col-stats-index table", e); + throw new HoodieException("Failed to build new col-stats-index table", e); } } @@ -457,7 +352,7 @@ public class ZOrderingIndexHelper { @Nonnull Dataset newIndexTableDf, @Nonnull Dataset existingIndexTableDf ) { - // NOTE: If new Z-index table schema is incompatible with that one of existing table + // NOTE: If new col-stats index table schema is incompatible with that one of existing table // that is most likely due to changing settings of list of Z-ordered columns, that // occurred since last index table have been persisted. // @@ -503,27 +398,6 @@ public class ZOrderingIndexHelper { return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty()); } - @Nullable - private static String mapToSourceTableColumnName(StructField fieldStruct) { - String name = fieldStruct.name(); - int maxStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MAX_VALUE_STAT_NAME)); - if (maxStatSuffixIdx != -1) { - return name.substring(0, maxStatSuffixIdx); - } - - int minStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MIN_VALUE_STAT_NAME)); - if (minStatSuffixIdx != -1) { - return name.substring(0, minStatSuffixIdx); - } - - int numNullsSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_NUM_NULLS_STAT_NAME)); - if (numNullsSuffixIdx != -1) { - return name.substring(0, numNullsSuffixIdx); - } - - return null; - } - private static String composeZIndexColName(String col, String statName) { // TODO add escaping for return String.format("%s_%s", col, statName); @@ -589,7 +463,7 @@ public class ZOrderingIndexHelper { * @VisibleForTesting */ @Nonnull - public static String createIndexMergeSql( + static String createIndexMergeSql( @Nonnull String originalIndexTable, @Nonnull String newIndexTable, @Nonnull List columns diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java new file mode 100644 index 000000000..8ebc032a1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sort; + +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.optimize.HilbertCurveUtils; +import org.apache.hudi.common.util.BinaryUtil; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.Row$; +import org.apache.spark.sql.hudi.execution.RangeSampleSort$; +import org.apache.spark.sql.hudi.execution.ByteArraySorting; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType; +import org.davidmoten.hilbert.HilbertCurve; +import scala.collection.JavaConversions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class SpaceCurveSortingHelper { + + private static final Logger LOG = LogManager.getLogger(SpaceCurveSortingHelper.class); + + /** + * Orders provided {@link Dataset} by mapping values of the provided list of columns + * {@code orderByCols} onto a specified space curve (Z-curve, Hilbert, etc) + * + *

+ * NOTE: Only support base data-types: long,int,short,double,float,string,timestamp,decimal,date,byte. + * This method is more effective than {@link #orderDataFrameBySamplingValues} leveraging + * data sampling instead of direct mapping + * + * @param df Spark {@link Dataset} holding data to be ordered + * @param orderByCols list of columns to be ordered by + * @param targetPartitionCount target number of output partitions + * @param layoutOptStrategy target layout optimization strategy + * @return a {@link Dataset} holding data ordered by mapping tuple of values from provided columns + * onto a specified space-curve + */ + public static Dataset orderDataFrameByMappingValues( + Dataset df, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + List orderByCols, + int targetPartitionCount + ) { + Map columnsMap = + Arrays.stream(df.schema().fields()) + .collect(Collectors.toMap(StructField::name, Function.identity())); + + List checkCols = + orderByCols.stream() + .filter(columnsMap::containsKey) + .collect(Collectors.toList()); + + if (orderByCols.size() != checkCols.size()) { + LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols))); + return df; + } + + // In case when there's just one column to be ordered by, we can skip space-curve + // ordering altogether (since it will match linear ordering anyway) + if (orderByCols.size() == 1) { + String orderByColName = orderByCols.get(0); + LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName)); + + // TODO validate if we need Spark to re-partition + return df.repartitionByRange(targetPartitionCount, new Column(orderByColName)); + } + + int fieldNum = df.schema().fields().length; + + Map fieldMap = + orderByCols.stream() + .collect( + Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get)); + + JavaRDD sortedRDD; + switch (layoutOptStrategy) { + case ZORDER: + sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount); + break; + case HILBERT: + sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount); + break; + default: + throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", layoutOptStrategy)); + } + + // Compose new {@code StructType} for ordered RDDs + StructType newStructType = composeOrderedRDDStructType(df.schema()); + + return df.sparkSession() + .createDataFrame(sortedRDD, newStructType) + .drop("Index"); + } + + private static StructType composeOrderedRDDStructType(StructType schema) { + return StructType$.MODULE$.apply( + CollectionUtils.combine( + Arrays.asList(schema.fields()), + Arrays.asList(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty())) + ) + ); + } + + private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { + return originRDD.map(row -> { + List zBytesList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return BinaryUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + byte[][] zBytes = new byte[zBytesList.size()][]; + for (int i = 0; i < zBytesList.size(); i++) { + zBytes[i] = zBytesList.get(i); + } + List zVaules = new ArrayList<>(); + zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + zVaules.add(BinaryUtil.interleaving(zBytes, 8)); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + }) + .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); + } + + private static JavaRDD createHilbertSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { + return originRDD.mapPartitions(rows -> { + HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size()); + return new Iterator() { + + @Override + public boolean hasNext() { + return rows.hasNext(); + } + + @Override + public Row next() { + Row row = rows.next(); + List longList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); + } else if (dataType instanceof DoubleType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index); + } else if (dataType instanceof FloatType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); + } else if (dataType instanceof StringType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); + } else if (dataType instanceof DateType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); + } else if (dataType instanceof TimestampType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); + } else if (dataType instanceof ByteType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + } else if (dataType instanceof ShortType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index); + } else if (dataType instanceof DecimalType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return value ? Long.MAX_VALUE : 0; + } else if (dataType instanceof BinaryType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + + byte[] hilbertValue = HilbertCurveUtils.indexBytes( + hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63); + List values = new ArrayList<>(); + values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + values.add(hilbertValue); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values)); + } + }; + }).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); + } + + public static Dataset orderDataFrameBySamplingValues( + Dataset df, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + List orderByCols, + int targetPartitionCount + ) { + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, layoutOptStrategy, JavaConversions.asScalaBuffer(orderByCols), targetPartitionCount); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 7d2fbd32c..aa9a924ed 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -47,7 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; @@ -172,18 +172,17 @@ public class HoodieSparkCopyOnWriteTable @Override public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List stats, @Nonnull String instantTime) throws Exception { - // Updates Z-ordering Index - updateZIndex(context, stats, instantTime); + updateColumnsStatsIndex(context, stats, instantTime); } - private void updateZIndex( + private void updateColumnsStatsIndex( @Nonnull HoodieEngineContext context, @Nonnull List updatedFilesStats, @Nonnull String instantTime ) throws Exception { String sortColsList = config.getClusteringSortColumns(); String basePath = metaClient.getBasePath(); - String indexPath = metaClient.getZindexPath(); + String indexPath = metaClient.getColumnStatsIndexPath(); List completedCommits = metaClient.getCommitsTimeline() @@ -201,7 +200,7 @@ public class HoodieSparkCopyOnWriteTable return; } - LOG.info(String.format("Updating Z-index table (%s)", indexPath)); + LOG.info(String.format("Updating column-statistics index table (%s)", indexPath)); List sortCols = Arrays.stream(sortColsList.split(",")) .map(String::trim) @@ -209,13 +208,13 @@ public class HoodieSparkCopyOnWriteTable HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context; - // Fetch table schema to appropriately construct Z-index schema + // Fetch table schema to appropriately construct col-stats index schema Schema tableWriteSchema = HoodieAvroUtils.createHoodieWriteSchema( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), touchedFiles, @@ -225,7 +224,7 @@ public class HoodieSparkCopyOnWriteTable completedCommits ); - LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime)); + LOG.info(String.format("Successfully updated column-statistics index at instant (%s)", instantTime)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java deleted file mode 100644 index 3d0138e9b..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieColumnRangeMetadata; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; -import org.apache.hudi.optimize.HilbertCurveUtils; -import org.apache.hudi.optimize.ZOrderingUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.io.api.Binary; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.Row$; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.hudi.execution.RangeSampleSort$; -import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; -import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.BinaryType$; -import org.apache.spark.sql.types.BooleanType; -import org.apache.spark.sql.types.ByteType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DateType; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.FloatType; -import org.apache.spark.sql.types.IntegerType; -import org.apache.spark.sql.types.LongType; -import org.apache.spark.sql.types.LongType$; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.ShortType; -import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StringType$; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType$; -import org.apache.spark.sql.types.TimestampType; -import org.apache.spark.util.SerializableConfiguration; -import org.davidmoten.hilbert.HilbertCurve; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import scala.collection.JavaConversions; - -public class OrderingIndexHelper { - - private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; - - /** - * Create optimized DataFrame directly - * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte - * this method is more effective than createOptimizeDataFrameBySample - * - * @param df a spark DataFrame holds parquet files to be read. - * @param sortCols ordering columns for the curve - * @param fileNum spark partition num - * @param sortMode layout optimization strategy - * @return a dataFrame ordered by the curve. - */ - public static Dataset createOptimizedDataFrameByMapValue(Dataset df, List sortCols, int fileNum, String sortMode) { - Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); - int fieldNum = df.schema().fields().length; - List checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); - if (sortCols.size() != checkCols.size()) { - return df; - } - // only one col to sort, no need to use z-order - if (sortCols.size() == 1) { - return df.repartitionByRange(fileNum, org.apache.spark.sql.functions.col(sortCols.get(0))); - } - Map fieldMap = sortCols - .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); - // do optimize - JavaRDD sortedRDD = null; - switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) { - case ZORDER: - sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); - break; - case HILBERT: - sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); - break; - default: - throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode)); - } - // create new StructType - List newFields = new ArrayList<>(); - newFields.addAll(Arrays.asList(df.schema().fields())); - newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty())); - - // create new DataFrame - return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index"); - } - - private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { - return originRDD.map(row -> { - List zBytesList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); - } else if (dataType instanceof DoubleType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); - } else if (dataType instanceof FloatType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); - } else if (dataType instanceof StringType) { - return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); - } else if (dataType instanceof DateType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); - } else if (dataType instanceof TimestampType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); - } else if (dataType instanceof ByteType) { - return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); - } else if (dataType instanceof ShortType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); - } else if (dataType instanceof DecimalType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return ZOrderingUtil.intTo8Byte(value ? 1 : 0); - } else if (dataType instanceof BinaryType) { - return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); - byte[][] zBytes = new byte[zBytesList.size()][]; - for (int i = 0; i < zBytesList.size(); i++) { - zBytes[i] = zBytesList.get(i); - } - List zVaules = new ArrayList<>(); - zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); - }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); - } - - private static JavaRDD createHilbertSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { - return originRDD.mapPartitions(rows -> { - HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size()); - return new Iterator() { - - @Override - public boolean hasNext() { - return rows.hasNext(); - } - - @Override - public Row next() { - Row row = rows.next(); - List longList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); - } else if (dataType instanceof DoubleType) { - return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index); - } else if (dataType instanceof FloatType) { - return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); - } else if (dataType instanceof StringType) { - return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index)); - } else if (dataType instanceof DateType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); - } else if (dataType instanceof TimestampType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); - } else if (dataType instanceof ByteType) { - return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)}); - } else if (dataType instanceof ShortType) { - return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index); - } else if (dataType instanceof DecimalType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return value ? Long.MAX_VALUE : 0; - } else if (dataType instanceof BinaryType) { - return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); - - byte[] hilbertValue = HilbertCurveUtils.indexBytes( - hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63); - List values = new ArrayList<>(); - values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - values.add(hilbertValue); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values)); - } - }; - }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); - } - - public static Dataset createOptimizedDataFrameByMapValue(Dataset df, String sortCols, int fileNum, String sortMode) { - if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) { - return df; - } - return createOptimizedDataFrameByMapValue(df, - Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); - } - - public static Dataset createOptimizeDataFrameBySample(Dataset df, List zCols, int fileNum, String sortMode) { - return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode); - } - - public static Dataset createOptimizeDataFrameBySample(Dataset df, String zCols, int fileNum, String sortMode) { - if (zCols == null || zCols.isEmpty() || fileNum <= 0) { - return df; - } - return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); - } - - /** - * Parse min/max statistics stored in parquet footers for z-sort cols. - * no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType. - * to do adapt for rfc-27 - * - * @param df a spark DataFrame holds parquet files to be read. - * @param cols z-sort cols - * @return a dataFrame holds all statistics info. - */ - public static Dataset getMinMaxValue(Dataset df, List cols) { - Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType())); - - List scanFiles = Arrays.asList(df.inputFiles()); - SparkContext sc = df.sparkSession().sparkContext(); - JavaSparkContext jsc = new JavaSparkContext(sc); - - SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); - int numParallelism = (scanFiles.size() / 3 + 1); - List> colMinMaxInfos; - String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); - try { - jsc.setJobDescription("Listing parquet column statistics"); - colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { - Configuration conf = serializableConfiguration.value(); - ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); - List>> results = new ArrayList<>(); - while (paths.hasNext()) { - String path = paths.next(); - results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols)); - } - return results.stream().flatMap(f -> f.stream()).iterator(); - }).collect(); - } finally { - jsc.setJobDescription(previousJobDescription); - } - - Map>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); - JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> { - int colSize = f.size(); - if (colSize == 0) { - return null; - } else { - List rows = new ArrayList<>(); - rows.add(f.get(0).getFilePath()); - cols.stream().forEach(col -> { - HoodieColumnRangeMetadata currentColRangeMetaData = - f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null); - DataType colType = columnsMap.get(col); - if (currentColRangeMetaData == null || colType == null) { - throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col)); - } - if (colType instanceof IntegerType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof DoubleType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof StringType) { - rows.add(currentColRangeMetaData.getMinValue().toString()); - rows.add(currentColRangeMetaData.getMaxValue().toString()); - } else if (colType instanceof DecimalType) { - rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString())); - rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString())); - } else if (colType instanceof DateType) { - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString())); - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString())); - } else if (colType instanceof LongType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof ShortType) { - rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString())); - rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString())); - } else if (colType instanceof FloatType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof BinaryType) { - rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); - rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); - } else if (colType instanceof BooleanType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof ByteType) { - rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString())); - rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString())); - } else { - throw new HoodieException(String.format("Not support type: %s", colType)); - } - rows.add(currentColRangeMetaData.getNumNulls()); - }); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows)); - } - }).filter(f -> f != null); - List allMetaDataSchema = new ArrayList<>(); - allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty())); - cols.forEach(col -> { - allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty())); - allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty())); - allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty())); - }); - return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema)); - } - - public static Dataset getMinMaxValue(Dataset df, String cols) { - List rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList()); - return getMinMaxValue(df, rawCols); - } - - /** - * Update statistics info. - * this method will update old index table by full out join, - * and save the updated table into a new index table based on commitTime. - * old index table will be cleaned also. - * - * @param df a spark DataFrame holds parquet files to be read. - * @param cols z-sort cols. - * @param indexPath index store path. - * @param commitTime current operation commitTime. - * @param validateCommits all validate commits for current table. - * @return - */ - public static void saveStatisticsInfo(Dataset df, String cols, String indexPath, String commitTime, List validateCommits) { - Path savePath = new Path(indexPath, commitTime); - SparkSession spark = df.sparkSession(); - FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); - Dataset statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols); - // try to find last validate index table from index path - try { - // If there's currently no index, create one - if (!fs.exists(new Path(indexPath))) { - statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); - return; - } - - // Otherwise, clean up all indexes but the most recent one - - List allIndexTables = Arrays - .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); - List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); - List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); - Option latestIndexData = Option.empty(); - if (!candidateIndexTables.isEmpty()) { - latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); - // clean old index table, keep at most 1 index table. - candidateIndexTables.remove(candidateIndexTables.size() - 1); - candidateIndexTables.forEach(f -> { - try { - fs.delete(new Path(indexPath, f)); - } catch (IOException ie) { - throw new HoodieException(ie); - } - }); - } - - // clean residualTables - // retried cluster operations at the same instant time is also considered, - // the residual files produced by retried are cleaned up before save statistics - // save statistics info to index table which named commitTime - residualTables.forEach(f -> { - try { - fs.delete(new Path(indexPath, f)); - } catch (IOException ie) { - throw new HoodieException(ie); - } - }); - - if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { - // update the statistics info - String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - latestIndexData.get().registerTempTable(originalTable); - statisticsDF.registerTempTable(updateTable); - // update table by full out join - List columns = Arrays.asList(statisticsDF.schema().fieldNames()); - spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString()); - } else { - statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); - } - } catch (IOException e) { - throw new HoodieException(e); - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index a168e55b7..7c39ce254 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.hudi.execution +import org.apache.hudi.common.util.BinaryUtil import org.apache.hudi.config.HoodieClusteringConfig -import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil} +import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy +import org.apache.hudi.optimize.HilbertCurveUtils import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering @@ -235,15 +237,214 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S } } -case class ZorderingBinarySort(b: Array[Byte]) extends Ordered[ZorderingBinarySort] with Serializable { - override def compare(that: ZorderingBinarySort): Int = { +case class ByteArraySorting(b: Array[Byte]) extends Ordered[ByteArraySorting] with Serializable { + override def compare(that: ByteArraySorting): Int = { val len = this.b.length - ZOrderingUtil.compareTo(this.b, 0, len, that.b, 0, len) + BinaryUtil.compareTo(this.b, 0, len, that.b, 0, len) } } object RangeSampleSort { + /** + * create optimize DataFrame by sample + * first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame + * support all type data. + * this method need more resource and cost more time than createOptimizedDataFrameByMapValue + */ + def sortDataFrameBySample(df: DataFrame, layoutOptStrategy: LayoutOptimizationStrategy, orderByCols: Seq[String], targetPartitionsCount: Int): DataFrame = { + val spark = df.sparkSession + val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap + val fieldNum = df.schema.fields.length + val checkCols = orderByCols.filter(col => columnsMap(col) != null) + + if (orderByCols.isEmpty || checkCols.isEmpty) { + df + } else { + val zFields = orderByCols.map { col => + val newCol = columnsMap(col) + if (newCol == null) { + (-1, null) + } else { + newCol.dataType match { + case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType => + (df.schema.fields.indexOf(newCol), newCol) + case d: DecimalType => + (df.schema.fields.indexOf(newCol), newCol) + case _ => + (-1, null) + } + } + }.filter(_._1 != -1) + // Complex type found, use createZIndexedDataFrameByRange + if (zFields.length != orderByCols.length) { + return sortDataFrameBySampleSupportAllTypes(df, orderByCols, targetPartitionsCount) + } + + val rawRdd = df.rdd + val sampleRdd = rawRdd.map { row => + val values = zFields.map { case (index, field) => + field.dataType match { + case LongType => + if (row.isNullAt(index)) Long.MaxValue else row.getLong(index) + case DoubleType => + if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index)) + case IntegerType => + if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong + case FloatType => + if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble) + case StringType => + if (row.isNullAt(index)) "" else row.getString(index) + case DateType => + if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime + case TimestampType => + if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime + case ByteType => + if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong + case ShortType => + if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong + case d: DecimalType => + if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue() + case _ => + null + } + }.filter(v => v != null).toArray + (values, null) + } + val zOrderBounds = df.sparkSession.sessionState.conf.getConfString( + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key, + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt + val sample = new RangeSample(zOrderBounds, sampleRdd) + val rangeBounds = sample.getRangeBounds() + val sampleBounds = { + val candidateColNumber = rangeBounds.head._1.length + (0 to candidateColNumber - 1).map { i => + val colRangeBound = rangeBounds.map(x => (x._1(i), x._2)) + + if (colRangeBound.head._1.isInstanceOf[String]) { + sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String]) + } else { + sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long]) + } + } + } + + // expand bounds. + // maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength, + // however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number" + val maxLength = sampleBounds.map(_.length).max + val expandSampleBoundsWithFactor = sampleBounds.map { bound => + val fillFactor = maxLength / bound.size + val newBound = new Array[Double](bound.length * fillFactor) + if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { + val longBound = bound.asInstanceOf[Array[Long]] + for (i <- 0 to bound.length - 1) { + for (j <- 0 to fillFactor - 1) { + // sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice + newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) + } + } + (newBound, fillFactor) + } else { + (bound, 0) + } + } + + val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) + + val indexRdd = rawRdd.mapPartitions { iter => + val expandBoundsWithFactor = boundBroadCast.value + val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max + val longDecisionBound = new RawDecisionBound(Ordering[Long]) + val doubleDecisionBound = new RawDecisionBound(Ordering[Double]) + val stringDecisionBound = new RawDecisionBound(Ordering[String]) + import java.util.concurrent.ThreadLocalRandom + val threadLocalRandom = ThreadLocalRandom.current + + def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = { + val (expandBound, factor) = expandBoundsWithFactor(rawIndex) + if (isNull) { + expandBound.length + 1 + } else { + if (factor > 1) { + doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]]) + } else { + longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]]) + } + } + } + + val hilbertCurve = if (layoutOptStrategy == LayoutOptimizationStrategy.HILBERT) + Some(HilbertCurve.bits(32).dimensions(zFields.length)) + else + None + + iter.map { row => + val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) => + field.dataType match { + case LongType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull) + case DoubleType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull) + case IntegerType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull) + case FloatType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull) + case StringType => + val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length + if (row.isNullAt(index)) { + maxBoundNum + 1 + } else { + val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]]) + if (factor > 1) { + (currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt) + } else { + currentRank + } + } + case DateType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull) + case TimestampType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull) + case ByteType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull) + case ShortType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull) + case d: DecimalType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull) + case _ => + -1 + } + }.filter(v => v != -1) + + val mapValues = layoutOptStrategy match { + case LayoutOptimizationStrategy.HILBERT => + HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) + case LayoutOptimizationStrategy.ZORDER => + BinaryUtil.interleaving(values.map(BinaryUtil.intTo8Byte(_)).toArray, 8) + } + + Row.fromSeq(row.toSeq ++ Seq(mapValues)) + } + }.sortBy(x => ByteArraySorting(x.getAs[Array[Byte]](fieldNum)), numPartitions = targetPartitionsCount) + val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( + df.schema.fields ++ Seq( + StructField(s"index", + BinaryType, false)) + )) + newDF.drop("index") + } + } + /** * create z-order DataFrame by sample * support all col types @@ -324,212 +525,15 @@ object RangeSampleSort { decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) } } - }.toArray.map(ZOrderingUtil.intTo8Byte(_)) - val zValues = ZOrderingUtil.interleaving(interleaveValues, 8) + }.toArray.map(BinaryUtil.intTo8Byte(_)) + val zValues = BinaryUtil.interleaving(interleaveValues, 8) val mutablePair = new MutablePair[InternalRow, Array[Byte]]() mutablePair.update(unsafeRow, zValues) } - }.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1) + }.sortBy(x => ByteArraySorting(x._2), numPartitions = fileNum).map(_._1) spark.internalCreateDataFrame(indexRdd, schema) } } - - /** - * create optimize DataFrame by sample - * first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame - * support all type data. - * this method need more resource and cost more time than createOptimizedDataFrameByMapValue - */ - def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, sortMode: String): DataFrame = { - val spark = df.sparkSession - val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap - val fieldNum = df.schema.fields.length - val checkCols = zCols.filter(col => columnsMap(col) != null) - val useHilbert = sortMode match { - case "hilbert" => true - case "z-order" => false - case other => throw new IllegalArgumentException(s"new only support z-order/hilbert optimize but find: ${other}") - } - - if (zCols.isEmpty || checkCols.isEmpty) { - df - } else { - val zFields = zCols.map { col => - val newCol = columnsMap(col) - if (newCol == null) { - (-1, null) - } else { - newCol.dataType match { - case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType => - (df.schema.fields.indexOf(newCol), newCol) - case d: DecimalType => - (df.schema.fields.indexOf(newCol), newCol) - case _ => - (-1, null) - } - } - }.filter(_._1 != -1) - // Complex type found, use createZIndexedDataFrameByRange - if (zFields.length != zCols.length) { - return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum) - } - - val rawRdd = df.rdd - val sampleRdd = rawRdd.map { row => - val values = zFields.map { case (index, field) => - field.dataType match { - case LongType => - if (row.isNullAt(index)) Long.MaxValue else row.getLong(index) - case DoubleType => - if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index)) - case IntegerType => - if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong - case FloatType => - if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble) - case StringType => - if (row.isNullAt(index)) "" else row.getString(index) - case DateType => - if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime - case TimestampType => - if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime - case ByteType => - if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong - case ShortType => - if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong - case d: DecimalType => - if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue() - case _ => - null - } - }.filter(v => v != null).toArray - (values, null) - } - val zOrderBounds = df.sparkSession.sessionState.conf.getConfString( - HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key, - HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt - val sample = new RangeSample(zOrderBounds, sampleRdd) - val rangeBounds = sample.getRangeBounds() - val sampleBounds = { - val candidateColNumber = rangeBounds.head._1.length - (0 to candidateColNumber - 1).map { i => - val colRangeBound = rangeBounds.map(x => (x._1(i), x._2)) - - if (colRangeBound.head._1.isInstanceOf[String]) { - sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String]) - } else { - sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long]) - } - } - } - - // expand bounds. - // maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength, - // however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number" - val maxLength = sampleBounds.map(_.length).max - val expandSampleBoundsWithFactor = sampleBounds.map { bound => - val fillFactor = maxLength / bound.size - val newBound = new Array[Double](bound.length * fillFactor) - if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { - val longBound = bound.asInstanceOf[Array[Long]] - for (i <- 0 to bound.length - 1) { - for (j <- 0 to fillFactor - 1) { - // sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice - newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) - } - } - (newBound, fillFactor) - } else { - (bound, 0) - } - } - - val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) - - val indexRdd = rawRdd.mapPartitions { iter => - val hilbertCurve = if (useHilbert) Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None - val expandBoundsWithFactor = boundBroadCast.value - val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max - val longDecisionBound = new RawDecisionBound(Ordering[Long]) - val doubleDecisionBound = new RawDecisionBound(Ordering[Double]) - val stringDecisionBound = new RawDecisionBound(Ordering[String]) - import java.util.concurrent.ThreadLocalRandom - val threadLocalRandom = ThreadLocalRandom.current - - def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = { - val (expandBound, factor) = expandBoundsWithFactor(rawIndex) - if (isNull) { - expandBound.length + 1 - } else { - if (factor > 1) { - doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]]) - } else { - longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]]) - } - } - } - - iter.map { row => - val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) => - field.dataType match { - case LongType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull) - case DoubleType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull) - case IntegerType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull) - case FloatType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull) - case StringType => - val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length - if (row.isNullAt(index)) { - maxBoundNum + 1 - } else { - val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]]) - if (factor > 1) { - (currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt) - } else { - currentRank - } - } - case DateType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull) - case TimestampType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull) - case ByteType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull) - case ShortType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull) - case d: DecimalType => - val isNull = row.isNullAt(index) - getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull) - case _ => - -1 - } - }.filter(v => v != -1) - val mapValues = if (hilbertCurve.isDefined) { - HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) - } else { - ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) - } - Row.fromSeq(row.toSeq ++ Seq(mapValues)) - } - }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum) - val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( - df.schema.fields ++ Seq( - StructField(s"index", - BinaryType, false)) - )) - newDF.drop("index") - } - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java similarity index 86% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java index 1fad1dc31..3901a9378 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.index.zorder; +package org.apache.hudi.index.columnstats; import org.junit.jupiter.api.Test; @@ -25,11 +25,11 @@ import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestZOrderingIndexHelper { +public class TestColumnStatsIndexHelper { @Test public void testMergeSql() { - String q = ZOrderingIndexHelper.createIndexMergeSql("old", "new", Arrays.asList("file", "a", "b")); + String q = ColumnStatsIndexHelper.createIndexMergeSql("old", "new", Arrays.asList("file", "a", "b")); assertEquals( "SELECT " + "if (new.file is null, old.file, new.file) AS file, " diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 0b2a5b0b2..ec0271f6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -79,7 +79,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux"; public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; - public static final String ZINDEX_NAME = ".zindex"; + public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -178,10 +178,10 @@ public class HoodieTableMetaClient implements Serializable { } /** - * @return z-index path + * @return Column Statistics index path */ - public String getZindexPath() { - return new Path(metaPath, ZINDEX_NAME).toString(); + public String getColumnStatsIndexPath() { + return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString(); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java similarity index 98% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 50827cc2e..0c7e89895 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.hudi.optimize; +package org.apache.hudi.common.util; import java.nio.charset.Charset; -public class ZOrderingUtil { +public class BinaryUtil { /** * Lexicographically compare two arrays. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 265a601eb..cbcdbc404 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -18,15 +18,17 @@ package org.apache.hudi.common.util; -import java.util.Properties; import org.apache.hudi.common.util.collection.Pair; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,6 +37,36 @@ public class CollectionUtils { public static final Properties EMPTY_PROPERTIES = new Properties(); + /** + * Combines provided {@link List}s into one + */ + public static List combine(List one, List another) { + ArrayList combined = new ArrayList<>(one); + combined.addAll(another); + return combined; + } + + /** + * Returns difference b/w {@code one} {@link Set} of elements and {@code another} + */ + public static Set diff(Set one, Set another) { + Set diff = new HashSet<>(one); + diff.removeAll(another); + return diff; + } + + /** + * Returns difference b/w {@code one} {@link List} of elements and {@code another} + * + * NOTE: This is less optimal counterpart to {@link #diff(Set, Set)}, accepting {@link List} + * as a holding collection to support duplicate elements use-cases + */ + public static List diff(List one, List another) { + List diff = new ArrayList<>(one); + diff.removeAll(another); + return diff; + } + /** * Determines whether two iterators contain equal elements in the same order. More specifically, * this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java new file mode 100644 index 000000000..d713b183a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TypeUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class TypeUtils { + + private TypeUtils() {} + + /** + * Maps values from the provided Enum's {@link Class} into corresponding values, + * extracted by provided {@code valueMapper} + */ + public static > Map getValueToEnumMap( + @Nonnull Class klass, + @Nonnull Function valueMapper + ) { + return Arrays.stream(klass.getEnumConstants()) + .collect(Collectors.toMap(valueMapper, Function.identity())); + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java similarity index 81% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java index a22485ff9..1efe5a068 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.optimize; +package org.apache.hudi.common.util; import org.junit.jupiter.api.Test; @@ -27,7 +27,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestZOrderingUtil { +public class TestBinaryUtil { @Test public void testIntConvert() { @@ -37,12 +37,12 @@ public class TestZOrderingUtil { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testInt.length; i++) { valueWrappers.add(new OrginValueWrapper<>(i, testInt[i])); - convertResultWrappers.add(new ConvertResultWrapper<>(i, ZOrderingUtil.intTo8Byte(testInt[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtil.intTo8Byte(testInt[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testInt.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -57,12 +57,12 @@ public class TestZOrderingUtil { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testLong.length; i++) { valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((long)i, ZOrderingUtil.longTo8Byte(testLong[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtil.longTo8Byte(testLong[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testLong.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -77,12 +77,12 @@ public class TestZOrderingUtil { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), ZOrderingUtil.doubleTo8Byte(testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtil.doubleTo8Byte(testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -97,12 +97,12 @@ public class TestZOrderingUtil { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), ZOrderingUtil.doubleTo8Byte((double) testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtil.doubleTo8Byte((double) testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -131,7 +131,7 @@ public class TestZOrderingUtil { public void testConvertBytesToLong() { long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; for (int i = 0; i < tests.length; i++) { - assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + assertEquals(BinaryUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); } } @@ -140,7 +140,7 @@ public class TestZOrderingUtil { byte[] bytes = new byte[2]; bytes[0] = 2; bytes[1] = 127; - assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127); + assertEquals(BinaryUtil.convertBytesToLong(bytes), 2 * 256 + 127); } private byte[] convertLongToBytes(long num) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index add7fabf6..9b8d9d0d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Bound import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} -import org.apache.spark.sql.hudi.DataSkippingUtils.createZIndexLookupFilter +import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -165,8 +165,8 @@ case class HoodieFileIndex( /** * Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters} - * conditions, by leveraging custom Z-order index (Z-index) bearing "min", "max", "num_nulls" statistic - * for all clustered columns + * conditions, by leveraging custom Column Statistics index (col-stats-index) bearing "min", "max", + * "num_nulls" statistics for all clustered columns. * * NOTE: This method has to return complete set of candidate files, since only provided candidates will * ultimately be scanned as part of query execution. Hence, this method has to maintain the @@ -175,8 +175,8 @@ case class HoodieFileIndex( * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names */ - private def lookupCandidateFilesInZIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { - val indexPath = metaClient.getZindexPath + private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { + val indexPath = metaClient.getColumnStatsIndexPath val fs = metaClient.getFs if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) { @@ -203,14 +203,14 @@ case class HoodieFileIndex( Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) } catch { case t: Throwable => - logError("Failed to read Z-index; skipping", t) + logError("Failed to read col-stats index; skipping", t) None } dataFrameOpt.map(df => { val indexSchema = df.schema val indexFilter = - queryFilters.map(createZIndexLookupFilter(_, indexSchema)) + queryFilters.map(createColumnStatsIndexFilterExpr(_, indexSchema)) .reduce(And) logInfo(s"Index filter condition: $indexFilter") @@ -232,13 +232,13 @@ case class HoodieFileIndex( df.unpersist() - // NOTE: Z-index isn't guaranteed to have complete set of statistics for every + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every // base-file: since it's bound to clustering, which could occur asynchronously - // at arbitrary point in time, and is not likely to touching all of the base files. + // at arbitrary point in time, and is not likely to be touching all of the base files. // - // To close that gap, we manually compute the difference b/w all indexed (Z-index) + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) // files and all outstanding base-files, and make sure that all base files not - // represented w/in Z-index are included in the output of this method + // represented w/in the index are included in the output of this method val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) @@ -260,12 +260,12 @@ case class HoodieFileIndex( */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // Look up candidate files names in the Z-index, if all of the following conditions are true + // Look up candidate files names in the col-stats index, if all of the following conditions are true // - Data-skipping is enabled - // - Z-index is present + // - Col-Stats Index is present // - List of predicates (filters) is present val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInZIndex(dataFilters) match { + lookupCandidateFilesInColStatsIndex(dataFilters) match { case Success(opt) => opt case Failure(e) => if (e.isInstanceOf[AnalysisException]) { @@ -280,7 +280,7 @@ case class HoodieFileIndex( if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table - // Filter in candidate files based on the Z-index lookup + // Filter in candidate files based on the col-stats index lookup val candidateFiles = allFiles.filter(fileStatus => // NOTE: This predicate is true when {@code Option} is empty @@ -305,7 +305,7 @@ case class HoodieFileIndex( .filter(_ != null) .map(_.getFileStatus) - // Filter in candidate files based on the Z-index lookup + // Filter in candidate files based on the col-stats index lookup val candidateFiles = baseFileStatuses.filter(fs => // NOTE: This predicate is true when {@code Option} is empty diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 5aebe93d2..e5d6f525b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.index.zorder.ZOrderingIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor} +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -39,14 +39,14 @@ import scala.collection.JavaConverters._ object DataSkippingUtils extends Logging { /** - * Translates provided {@link filterExpr} into corresponding filter-expression for Z-index index table + * Translates provided {@link filterExpr} into corresponding filter-expression for column-stats index index table * to filter out candidate files that would hold records matching the original filter * - * @param sourceFilterExpr original filter from query + * @param sourceFilterExpr source table's query's filter expression * @param indexSchema index table schema - * @return filter for Z-index table + * @return filter for column-stats index's table */ - def createZIndexLookupFilter(sourceFilterExpr: Expression, indexSchema: StructType): Expression = { + def createColumnStatsIndexFilterExpr(sourceFilterExpr: Expression, indexSchema: StructType): Expression = { // Try to transform original Source Table's filter expression into // Column-Stats Index filter expression tryComposeIndexFilterExpr(sourceFilterExpr, indexSchema) match { @@ -201,14 +201,14 @@ object DataSkippingUtils extends Logging { ) case or: Or => - val resLeft = createZIndexLookupFilter(or.left, indexSchema) - val resRight = createZIndexLookupFilter(or.right, indexSchema) + val resLeft = createColumnStatsIndexFilterExpr(or.left, indexSchema) + val resRight = createColumnStatsIndexFilterExpr(or.right, indexSchema) Option(Or(resLeft, resRight)) case and: And => - val resLeft = createZIndexLookupFilter(and.left, indexSchema) - val resRight = createZIndexLookupFilter(and.right, indexSchema) + val resLeft = createColumnStatsIndexFilterExpr(and.left, indexSchema) + val resRight = createColumnStatsIndexFilterExpr(and.right, indexSchema) Option(And(resLeft, resRight)) @@ -219,10 +219,10 @@ object DataSkippingUtils extends Logging { // case Not(And(left: Expression, right: Expression)) => - Option(createZIndexLookupFilter(Or(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExpr(Or(Not(left), Not(right)), indexSchema)) case Not(Or(left: Expression, right: Expression)) => - Option(createZIndexLookupFilter(And(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExpr(And(Not(left), Not(right)), indexSchema)) case _: Expression => None } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index 75da45354..9e3572b56 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.hudi.index.zorder.ZOrderingIndexHelper +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Expression, Not} @@ -66,7 +66,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { ) val indexSchema = - ZOrderingIndexHelper.composeIndexSchema( + ColumnStatsIndexHelper.composeIndexSchema( sourceTableSchema.fields.toSeq .filter(f => indexedCols.contains(f.name)) .asJava @@ -77,7 +77,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { val resolvedExpr: Expression = resolveFilterExpr(sourceExpr, sourceTableSchema) - val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark import spark2.implicits._ @@ -97,7 +97,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { val resolvedExpr = resolveFilterExpr(sourceExpr, sourceTableSchema) - val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark import spark2.implicits._ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala similarity index 55% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 657b4f16e..e79067041 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -19,27 +19,17 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{LocatedFileStatus, Path} -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} -import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} -import org.apache.hudi.index.zorder.ZOrderingIndexHelper +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.spark.sql._ import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} -import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ -import scala.util.Random -@Tag("functional") -class TestZOrderLayoutOptimization extends HoodieClientTestBase { +class TestColumnStatsIndex extends HoodieClientTestBase { var spark: SparkSession = _ val sourceTableSchema = @@ -53,100 +43,25 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { .add("c7", BinaryType) .add("c8", ByteType) - val commonOpts = Map( - "hoodie.insert.shuffle.parallelism" -> "4", - "hoodie.upsert.shuffle.parallelism" -> "4", - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" - ) - @BeforeEach override def setUp() { initPath() initSparkContexts() - spark = sqlContext.sparkSession - initTestDataGenerator() initFileSystem() + spark = sqlContext.sparkSession } @AfterEach override def tearDown() = { - cleanupSparkContexts() - cleanupTestDataGenerator() cleanupFileSystem() - } - - @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testZOrderingLayoutClustering(tableType: String): Unit = { - val targetRecordsCount = 10000 - // Bulk Insert Operation - val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList - val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) - - writeDf.write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.compact.inline", "false") - .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) - // option for clustering - .option("hoodie.parquet.small.file.limit", "0") - .option("hoodie.clustering.inline", "true") - .option("hoodie.clustering.inline.max.commits", "1") - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") - .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") - .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) - .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") - .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") - .mode(SaveMode.Overwrite) - .save(basePath) - - val hudiMetaClient = HoodieTableMetaClient.builder - .setConf(hadoopConf) - .setBasePath(basePath) - .setLoadActiveTimelineOnLoad(true) - .build - - val lastCommit = hudiMetaClient.getActiveTimeline.getAllCommitsTimeline.lastInstant().get() - - assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastCommit.getAction) - assertEquals(HoodieInstant.State.COMPLETED, lastCommit.getState) - - val readDf = - spark.read - .format("hudi") - .load(basePath) - - val readDfSkip = - spark.read - .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") - .format("hudi") - .load(basePath) - - assertEquals(targetRecordsCount, readDf.count()) - assertEquals(targetRecordsCount, readDfSkip.count()) - - readDf.createOrReplaceTempView("hudi_snapshot_raw") - readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") - - def select(tableName: String) = - spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") - - assertRowsMatch( - select("hudi_snapshot_raw"), - select("hudi_snapshot_skipping") - ) + cleanupSparkContexts() } @Test @Disabled - def testZIndexTableComposition(): Unit = { + def testColumnStatsTableComposition(): Unit = { val inputDf = - // NOTE: Schema here is provided for validation that the input date is in the appropriate format + // NOTE: Schema here is provided for validation that the input date is in the appropriate format spark.read .schema(sourceTableSchema) .parquet( @@ -158,20 +73,20 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { // {@link TimestampType} is not supported, and will throw -- hence skipping "c4" val newZIndexTableDf = - ZOrderingIndexHelper.buildZIndexTableFor( + ColumnStatsIndexHelper.buildColumnStatsTableFor( inputDf.sparkSession, inputDf.inputFiles.toSeq, zorderedColsSchemaFields ) val indexSchema = - ZOrderingIndexHelper.composeIndexSchema( + ColumnStatsIndexHelper.composeIndexSchema( sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq ) // Collect Z-index stats manually (reading individual Parquet files) val manualZIndexTableDf = - buildZIndexTableManually( + buildColumnStatsTableManually( getClass.getClassLoader.getResource("index/zorder/input-table").toString, zorderedCols, indexSchema @@ -197,12 +112,12 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { @Test @Disabled - def testZIndexTableMerge(): Unit = { + def testColumnStatsTableMerge(): Unit = { val testZIndexPath = new Path(basePath, "zindex") val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8") val indexSchema = - ZOrderingIndexHelper.composeIndexSchema( + ColumnStatsIndexHelper.composeIndexSchema( sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq ) @@ -216,7 +131,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { getClass.getClassLoader.getResource("index/zorder/input-table").toString ) - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( firstInputDf.sparkSession, sourceTableSchema, firstInputDf.inputFiles.toSeq, @@ -229,13 +144,13 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able // to reliably retrieve it val initialZIndexTable = - spark.read - .parquet(new Path(testZIndexPath, firstCommitInstance).toString) + spark.read + .parquet(new Path(testZIndexPath, firstCommitInstance).toString) val expectedInitialZIndexTableDf = - spark.read - .schema(indexSchema) - .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) + spark.read + .schema(indexSchema) + .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(initialZIndexTable))) @@ -251,7 +166,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { // Update Z-index table // - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( secondInputDf.sparkSession, sourceTableSchema, secondInputDf.inputFiles.toSeq, @@ -264,8 +179,8 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able // to reliably retrieve it val mergedZIndexTable = - spark.read - .parquet(new Path(testZIndexPath, secondCommitInstance).toString) + spark.read + .parquet(new Path(testZIndexPath, secondCommitInstance).toString) val expectedMergedZIndexTableDf = spark.read @@ -277,7 +192,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { @Test @Disabled - def testZIndexTablesGarbageCollection(): Unit = { + def testColumnStatsTablesGarbageCollection(): Unit = { val testZIndexPath = new Path(System.getProperty("java.io.tmpdir"), "zindex") val fs = testZIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration) @@ -287,7 +202,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { ) // Try to save statistics - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, inputDf.inputFiles.toSeq, @@ -298,7 +213,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { ) // Save again - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, inputDf.inputFiles.toSeq, @@ -309,7 +224,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { ) // Test old index table being cleaned up - ZOrderingIndexHelper.updateZIndexFor( + ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, inputDf.inputFiles.toSeq, @@ -324,7 +239,7 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { assertEquals(fs.exists(new Path(testZIndexPath, "4")), true) } - private def buildZIndexTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = { + private def buildColumnStatsTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = { val files = { val it = fs.listFiles(new Path(tablePath), true) var seq = Seq[LocatedFileStatus]() @@ -339,17 +254,17 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString) val exprs: Seq[String] = s"'${typedLit(file.getPath.getName)}' AS file" +: - df.columns - .filter(col => zorderedCols.contains(col)) - .flatMap(col => { - val minColName = s"${col}_minValue" - val maxColName = s"${col}_maxValue" - Seq( - s"min($col) AS $minColName", - s"max($col) AS $maxColName", - s"sum(cast(isnull($col) AS long)) AS ${col}_num_nulls" - ) - }) + df.columns + .filter(col => zorderedCols.contains(col)) + .flatMap(col => { + val minColName = s"${col}_minValue" + val maxColName = s"${col}_maxValue" + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_num_nulls" + ) + }) df.selectExpr(exprs: _*) .collect() @@ -366,10 +281,6 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { .map(_.getString(0)) .mkString("\n") - private def assertRowsMatch(one: DataFrame, other: DataFrame) = { - val rows = one.count() - assert(rows == other.count() && one.intersect(other).count() == rows) - } private def sort(df: DataFrame): DataFrame = { // Since upon parsing JSON, Spark re-order columns in lexicographical order @@ -380,19 +291,4 @@ class TestZOrderLayoutOptimization extends HoodieClientTestBase { .sort("file") } - def createComplexDataFrame(spark: SparkSession): DataFrame = { - val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => - val c1 = Integer.valueOf(item) - val c2 = s" ${item}sdc" - val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") - val c4 = new Timestamp(System.currentTimeMillis()) - val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") - val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") - val c7 = Array(item).map(_.toByte) - val c8 = java.lang.Byte.valueOf("9") - - RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) - } - spark.createDataFrame(rdd, sourceTableSchema) - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala similarity index 50% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala index 4b7864f25..e453953ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala @@ -18,18 +18,16 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.Path -import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.spark.OrderingIndexHelper import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments import org.junit.jupiter.params.provider.{Arguments, MethodSource} @@ -39,9 +37,20 @@ import scala.collection.JavaConversions._ import scala.util.Random @Tag("functional") -class TestTableLayoutOptimization extends HoodieClientTestBase { +class TestSpaceCurveLayoutOptimization extends HoodieClientTestBase { var spark: SparkSession = _ + val sourceTableSchema = + new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", @@ -52,7 +61,8 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) - @BeforeEach override def setUp() { + @BeforeEach + override def setUp() { initPath() initSparkContexts() spark = sqlContext.sparkSession @@ -60,15 +70,16 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { initFileSystem() } - @AfterEach override def tearDown() = { + @AfterEach + override def tearDown() = { cleanupSparkContexts() cleanupTestDataGenerator() cleanupFileSystem() } @ParameterizedTest - @MethodSource(Array("testLayOutParameter")) - def testOptimizewithClustering(tableType: String, optimizeMode: String): Unit = { + @MethodSource(Array("testLayoutOptimizationParameters")) + def testLayoutOptimizationFunctional(tableType: String): Unit = { val targetRecordsCount = 10000 // Bulk Insert Operation val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList @@ -88,11 +99,21 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") - .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), optimizeMode) .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") .mode(SaveMode.Overwrite) .save(basePath) + val hudiMetaClient = HoodieTableMetaClient.builder + .setConf(hadoopConf) + .setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true) + .build + + val lastCommit = hudiMetaClient.getActiveTimeline.getAllCommitsTimeline.lastInstant().get() + + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastCommit.getAction) + assertEquals(HoodieInstant.State.COMPLETED, lastCommit.getState) + val readDf = spark.read .format("hudi") @@ -119,106 +140,12 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { ) } - def assertRowsMatch(one: DataFrame, other: DataFrame) = { + private def assertRowsMatch(one: DataFrame, other: DataFrame) = { val rows = one.count() assert(rows == other.count() && one.intersect(other).count() == rows) } - @Test - def testCollectMinMaxStatistics(): Unit = { - val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") - val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") - val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) - val complexDataFrame = createComplexDataFrame(spark) - complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) - val df = spark.read.load(testPath.toString) - try { - // test z-order/hilbert sort for all primitive type - // shoud not throw exception. - OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) - OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) - OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) - OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) - try { - // do not support TimeStampType, so if we collect statistics for c4, should throw exception - val colDf = OrderingIndexHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") - colDf.cache() - assertEquals(colDf.count(), 3) - assertEquals(colDf.take(1)(0).length, 22) - colDf.unpersist() - // try to save statistics - OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) - // save again - OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) - // test old index table clean - OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) - assertEquals(!fs.exists(new Path(statisticPath, "2")), true) - assertEquals(fs.exists(new Path(statisticPath, "3")), true) - // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. - OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) - assertEquals(fs.exists(new Path(statisticPath, "5")), true) - } finally { - if (fs.exists(testPath)) fs.delete(testPath) - if (fs.exists(statisticPath)) fs.delete(statisticPath) - } - } - } - - // test collect min-max statistic info for DateType in the case of multithreading. - // parquet will give a wrong statistic result for DateType in the case of multithreading. - @Test - def testMultiThreadParquetFooterReadForDateType(): Unit = { - // create parquet file with DateType - val rdd = spark.sparkContext.parallelize(0 to 100, 1) - .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}"))) - val df = spark.createDataFrame(rdd, new StructType().add("id", DateType)) - val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType") - val conf = spark.sparkContext.hadoopConfiguration - val cols = new java.util.ArrayList[String] - cols.add("id") - try { - df.repartition(3).write.mode("overwrite").save(testPath.toString) - val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x) - - val realResult = new Array[(String, String)](3) - inputFiles.zipWithIndex.foreach { case (f, index) => - val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] - val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - realResult(index) = (res.getMinValue.toString, res.getMaxValue.toString) - } - - // multi thread read with no lock - val resUseLock = new Array[(String, String)](3) - inputFiles.zipWithIndex.par.foreach { case (f, index) => - val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] - val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - resUseLock(index) = (res.getMinValue.toString, res.getMaxValue.toString) - } - - // check resUseNoLock, - // We can't guarantee that there must be problems in the case of multithreading. - // In order to make ut pass smoothly, we will not check resUseNoLock. - // check resUseLock - // should pass assert - realResult.zip(resUseLock).foreach { case (realValue, testValue) => - assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}") - } - } finally { - if (fs.exists(testPath)) fs.delete(testPath) - } - } - def createComplexDataFrame(spark: SparkSession): DataFrame = { - val schema = new StructType() - .add("c1", IntegerType) - .add("c2", StringType) - .add("c3", DecimalType(9,3)) - .add("c4", TimestampType) - .add("c5", ShortType) - .add("c6", DateType) - .add("c7", BinaryType) - .add("c8", ByteType) - val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => val c1 = Integer.valueOf(item) val c2 = s" ${item}sdc" @@ -231,12 +158,12 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) } - spark.createDataFrame(rdd, schema) + spark.createDataFrame(rdd, sourceTableSchema) } } -object TestTableLayoutOptimization { - def testLayOutParameter(): java.util.stream.Stream[Arguments] = { +object TestSpaceCurveLayoutOptimization { + def testLayoutOptimizationParameters(): java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( arguments("COPY_ON_WRITE", "hilbert"), arguments("COPY_ON_WRITE", "z-order"), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala index c8263b3a2..0b0599fb2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -19,27 +19,38 @@ package org.apache.spark.sql.execution.benchmark import org.apache.hadoop.fs.Path -import org.apache.spark.OrderingIndexHelper +import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper +import org.apache.hudi.sort.SpaceCurveSortingHelper import org.apache.spark.sql.DataFrame import org.apache.spark.sql.hudi.TestHoodieSqlBase +import org.apache.spark.sql.types.{IntegerType, StructField} +import org.junit.jupiter.api.{Disabled, Tag, Test} import scala.util.Random +import scala.collection.JavaConversions._ +@Tag("functional") object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { - def getSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { - val minMax = OrderingIndexHelper - .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, ${co2}") - .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) - var c = 0 - for (elem <- minMax) { - if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 && elem._4 >= value2)) { - c = c + 1 + def evalSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { + val sourceTableDF = spark.sql(s"select * from ${tableName}") + + val orderedColsTypes = Seq(StructField(co1, IntegerType), StructField(co2, IntegerType)) + val colStatsIndexTable = ColumnStatsIndexHelper + .buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes) + .collect() + .map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) + + var hits = 0 + for (fileStatRow <- colStatsIndexTable) { + if ((fileStatRow._1 <= value1 && fileStatRow._2 >= value1) || (fileStatRow._3 <= value2 && fileStatRow._4 >= value2)) { + hits = hits + 1 } } - val p = c / minMax.size.toDouble - println(s"for table ${tableName} with query filter: ${co1} = ${value1} or ${co2} = ${value2} we can achieve skipping percent ${1.0 - p}") + val p = hits / colStatsIndexTable.size.toDouble + println(s"For table ${tableName} with query filter: ${co1} = ${value1} or ${co2} = ${value2} we can achieve skipping percent ${1.0 - p} (w/ total files ${colStatsIndexTable.size})") } /* @@ -48,6 +59,8 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.855 for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.83 */ + @Test + @Disabled def runNormalTableSkippingBenchMark(): Unit = { withTempDir { f => withTempTable("table_z_sort_byMap", "table_z_sort_bySample", "table_hilbert_sort_byMap", "table_hilbert_sort_bySample") { @@ -55,10 +68,10 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { // choose median value as filter condition. // the median value of c1_int is 500000 // the median value of c2_int is 500000 - getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 500000) - getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 500000, 500000) - getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 500000, 500000) - getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 500000, 500000) + evalSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 500000) + evalSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 500000, 500000) + evalSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 500000, 500000) + evalSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 500000, 500000) } } } @@ -69,6 +82,8 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.05500000000000005 for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.84 */ + @Test + @Disabled def runSkewTableSkippingBenchMark(): Unit = { withTempDir { f => withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", "table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") { @@ -77,19 +92,14 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { // choose median value as filter condition. // the median value of c1_int is 5000 // the median value of c2_int is 500000 - getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) - getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) - getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) - getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + evalSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + evalSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + evalSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + evalSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) } } } - def main(args: Array[String]): Unit = { - runNormalTableSkippingBenchMark() - runSkewTableSkippingBenchMark() - } - def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) } @@ -97,11 +107,11 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = { import spark.implicits._ val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range), Random.nextInt(col2Range))).toDF("c1_int", "c2_int") - val dfOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") - val dfOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeByMap = SpaceCurveSortingHelper.orderDataFrameByMappingValues(df, LayoutOptimizationStrategy.ZORDER, Seq("c1_int", "c2_int"), 200) + val dfOptimizeBySample = SpaceCurveSortingHelper.orderDataFrameBySamplingValues(df, LayoutOptimizationStrategy.ZORDER, Seq("c1_int", "c2_int"), 200) - val dfHilbertOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") - val dfHilbertOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeByMap = SpaceCurveSortingHelper.orderDataFrameByMappingValues(df, LayoutOptimizationStrategy.HILBERT, Seq("c1_int", "c2_int"), 200) + val dfHilbertOptimizeBySample = SpaceCurveSortingHelper.orderDataFrameBySamplingValues(df, LayoutOptimizationStrategy.HILBERT, Seq("c1_int", "c2_int"), 200) saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" else "z_sort_byMap") saveAsTable(dfOptimizeBySample, tablePath, if (skewed) "z_sort_bySample_skew" else "z_sort_bySample") @@ -110,7 +120,6 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { } def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = { - df.write.mode("overwrite").save(new Path(savePath, suffix).toString) spark.read.parquet(new Path(savePath, suffix).toString).createOrReplaceTempView("table_" + suffix) }