1
0

[HUDI-2814] Make Z-index more generic Column-Stats Index (#4106)

This commit is contained in:
Alexey Kudinkin
2021-12-10 14:56:09 -08:00
committed by GitHub
parent 72901a33a1
commit 2d864f7524
23 changed files with 892 additions and 1244 deletions

View File

@@ -68,13 +68,13 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
HoodieWriteConfig writeConfig = getWriteConfig();
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
HoodieWriteConfig writeConfig = getWriteConfig();
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
@@ -84,13 +84,13 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
// Add to the current file-group
currentGroup.add(currentSlice);
// totalSizeSoFar could be 0 when new group was created in the previous conditional block.
// reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present.
if (totalSizeSoFar == 0) {
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
@@ -98,11 +98,12 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override

View File

@@ -64,7 +64,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -27,17 +29,20 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.OrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
* support z-curve optimization, hilbert will come soon.
@@ -74,25 +79,47 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPa
private JavaRDD<GenericRecord> prepareGenericRecord(JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final Schema schema) {
SerializableSchema serializableSchema = new SerializableSchema(schema);
JavaRDD<GenericRecord> genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
Dataset<Row> originDF = AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession());
Dataset<Row> zDataFrame;
Dataset<Row> originDF =
AvroConversionUtils.createDataFrame(
genericRecordJavaRDD.rdd(),
schema.toString(),
sparkEngineContext.getSqlContext().sparkSession()
);
switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT:
zDataFrame = OrderingIndexHelper
.createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
case SAMPLE:
zDataFrame = OrderingIndexHelper
.createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: ");
}
return HoodieSparkUtils.createRdd(zDataFrame, schema.getName(),
Dataset<Row> sortedDF = reorder(originDF, numOutputGroups);
return HoodieSparkUtils.createRdd(sortedDF, schema.getName(),
schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD();
}
private Dataset<Row> reorder(Dataset<Row> originDF, int numOutputGroups) {
String orderedColumnsListConfig = config.getClusteringSortColumns();
if (isNullOrEmpty(orderedColumnsListConfig) || numOutputGroups <= 0) {
// No-op
return originDF;
}
List<String> orderedCols =
Arrays.stream(orderedColumnsListConfig.split(","))
.map(String::trim)
.collect(Collectors.toList());
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy =
HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(config.getLayoutOptimizationStrategy());
HoodieClusteringConfig.BuildCurveStrategyType curveBuildStrategyType = config.getLayoutOptimizationCurveBuildMethod();
switch (curveBuildStrategyType) {
case DIRECT:
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveBuildStrategyType));
}
}
@Override
public boolean arePartitionRecordsSorted() {
return true;

View File

@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,21 +15,18 @@
* limitations under the License.
*/
package org.apache.hudi.index.zorder;
package org.apache.hudi.index.columnstats;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary;
@@ -41,10 +37,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
@@ -64,10 +57,9 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import scala.collection.JavaConversions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -79,13 +71,11 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import scala.collection.JavaConversions;
import static org.apache.hudi.util.DataTypeUtils.areCompatible;
public class ZOrderingIndexHelper {
public class ColumnStatsIndexHelper {
private static final Logger LOG = LogManager.getLogger(ZOrderingIndexHelper.class);
private static final Logger LOG = LogManager.getLogger(ColumnStatsIndexHelper.class);
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
@@ -108,106 +98,10 @@ public class ZOrderingIndexHelper {
}
/**
* Create z-order DataFrame directly
* first, map all base type data to byte[8], then create z-order DataFrame
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createZIndexDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param zCols z-sort cols
* @param fileNum spark partition num
* @return a dataFrame sorted by z-order.
*/
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, List<String> zCols, int fileNum) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (zCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (zCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0)));
}
Map<Integer, StructField> fieldMap = zCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// z-sort
JavaRDD<Row> sortedRdd = df.toJavaRDD().map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex");
}
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameByMapValue(df,
Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum,
HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
/**
* Parse min/max statistics from Parquet footers for provided columns and composes Z-index
* table in the following format with 3 statistics denominated for each Z-ordered column.
* For ex, if original table contained Z-ordered column {@code A}:
* Parse min/max statistics from Parquet footers for provided columns and composes column-stats
* index table in the following format with 3 statistics denominated for each
* linear/Z-curve/Hilbert-curve-ordered column. For ex, if original table contained
* column {@code A}:
*
* <pre>
* +---------------------------+------------+------------+-------------+
@@ -225,15 +119,15 @@ public class ZOrderingIndexHelper {
* @VisibleForTesting
*
* @param sparkSession encompassing Spark session
* @param baseFilesPaths list of base-files paths to be sourced for Z-index
* @param zorderedColumnSchemas target Z-ordered columns
* @param baseFilesPaths list of base-files paths to be sourced for column-stats index
* @param orderedColumnSchemas target ordered columns
* @return Spark's {@link Dataset} holding an index table
*/
@Nonnull
public static Dataset<Row> buildZIndexTableFor(
public static Dataset<Row> buildColumnStatsTableFor(
@Nonnull SparkSession sparkSession,
@Nonnull List<String> baseFilesPaths,
@Nonnull List<StructField> zorderedColumnSchemas
@Nonnull List<StructField> orderedColumnSchemas
) {
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
@@ -252,12 +146,12 @@ public class ZOrderingIndexHelper {
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path ->
utils.readRangeFromParquetMetadata(
serializableConfiguration.value(),
new Path(path),
zorderedColumnSchemas.stream()
.map(StructField::name)
.collect(Collectors.toList())
)
serializableConfiguration.value(),
new Path(path),
orderedColumnSchemas.stream()
.map(StructField::name)
.collect(Collectors.toList())
)
.stream()
)
.iterator();
@@ -288,7 +182,7 @@ public class ZOrderingIndexHelper {
indexRow.add(filePath);
// For each column
zorderedColumnSchemas.forEach(colSchema -> {
orderedColumnSchemas.forEach(colSchema -> {
String colName = colSchema.name();
HoodieColumnRangeMetadata<Comparable> colMetadata =
@@ -313,66 +207,67 @@ public class ZOrderingIndexHelper {
})
.filter(Objects::nonNull);
StructType indexSchema = composeIndexSchema(zorderedColumnSchemas);
StructType indexSchema = composeIndexSchema(orderedColumnSchemas);
return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
}
/**
* <p/>
* Updates state of the Z-index by:
* Updates state of the column-stats index by:
* <ol>
* <li>Updating Z-index with statistics for {@code sourceBaseFiles}, collecting corresponding
* column statistics from Parquet footers</li>
* <li>Merging newly built Z-index table with the most recent one (if present and not preempted)</li>
* <li>Updating column-stats index with statistics for {@code sourceBaseFiles},
* collecting corresponding column statistics from Parquet footers</li>
* <li>Merging newly built column-stats index table with the most recent one (if present
* and not preempted)</li>
* <li>Cleans up any residual index tables, that weren't cleaned up before</li>
* </ol>
*
* @param sparkSession encompassing Spark session
* @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema
* @param sourceBaseFiles list of base-files to be indexed
* @param zorderedCols target Z-ordered columns
* @param zindexFolderPath Z-index folder path
* @param orderedCols target ordered columns
* @param indexFolderPath col-stats index folder path
* @param commitTime current operation commit instant
* @param completedCommits all previously completed commit instants
*/
public static void updateZIndexFor(
public static void updateColumnStatsIndexFor(
@Nonnull SparkSession sparkSession,
@Nonnull StructType sourceTableSchema,
@Nonnull List<String> sourceBaseFiles,
@Nonnull List<String> zorderedCols,
@Nonnull String zindexFolderPath,
@Nonnull List<String> orderedCols,
@Nonnull String indexFolderPath,
@Nonnull String commitTime,
@Nonnull List<String> completedCommits
) {
FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration());
FileSystem fs = FSUtils.getFs(indexFolderPath, sparkSession.sparkContext().hadoopConfiguration());
// Compose new Z-index table for the given source base files
Dataset<Row> newZIndexDf =
buildZIndexTableFor(
// Compose new col-stats index table for the given source base files
Dataset<Row> newColStatsIndexDf =
buildColumnStatsTableFor(
sparkSession,
sourceBaseFiles,
zorderedCols.stream()
orderedCols.stream()
.map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)])
.collect(Collectors.toList())
);
try {
//
// Z-Index has the following folder structure:
// Column Stats Index has the following folder structure:
//
// .hoodie/
// .zindex/
// .colstatsindex/
// <instant>/
// <part-...>.parquet
// ...
//
// If index is currently empty (no persisted tables), we simply create one
// using clustering operation's commit instance as it's name
Path newIndexTablePath = new Path(zindexFolderPath, commitTime);
Path newIndexTablePath = new Path(indexFolderPath, commitTime);
if (!fs.exists(new Path(zindexFolderPath))) {
newZIndexDf.repartition(1)
if (!fs.exists(new Path(indexFolderPath))) {
newColStatsIndexDf.repartition(1)
.write()
.format("parquet")
.mode("overwrite")
@@ -383,8 +278,8 @@ public class ZOrderingIndexHelper {
// Filter in all index tables (w/in {@code .zindex} folder)
List<String> allIndexTables =
Arrays.stream(
fs.listStatus(new Path(zindexFolderPath))
)
fs.listStatus(new Path(indexFolderPath))
)
.filter(FileStatus::isDirectory)
.map(f -> f.getPath().getName())
.collect(Collectors.toList());
@@ -402,23 +297,23 @@ public class ZOrderingIndexHelper {
.filter(f -> !completedCommits.contains(f))
.collect(Collectors.toList());
Dataset<Row> finalZIndexDf;
// Before writing out new version of the Z-index table we need to merge it
Dataset<Row> finalColStatsIndexDf;
// Before writing out new version of the col-stats-index table we need to merge it
// with the most recent one that were successfully persisted previously
if (validIndexTables.isEmpty()) {
finalZIndexDf = newZIndexDf;
finalColStatsIndexDf = newColStatsIndexDf;
} else {
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
// prior to merging, since merging might fail otherwise due to schemas incompatibility
finalZIndexDf =
finalColStatsIndexDf =
tryMergeMostRecentIndexTableInto(
sparkSession,
newZIndexDf,
// Load current most recent Z-index table
newColStatsIndexDf,
// Load current most recent col-stats-index table
sparkSession.read().load(
new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
)
);
@@ -426,28 +321,28 @@ public class ZOrderingIndexHelper {
tablesToCleanup.addAll(validIndexTables);
}
// Persist new Z-index table
finalZIndexDf
.repartition(1)
.write()
.format("parquet")
.save(newIndexTablePath.toString());
// Persist new col-stats-index table
finalColStatsIndexDf
.repartition(1)
.write()
.format("parquet")
.save(newIndexTablePath.toString());
// Clean up residual Z-index tables that have might have been dangling since
// Clean up residual col-stats-index tables that have might have been dangling since
// previous iterations (due to intermittent failures during previous clean up)
tablesToCleanup.forEach(f -> {
try {
fs.delete(new Path(zindexFolderPath, f), true);
fs.delete(new Path(indexFolderPath, f), true);
} catch (IOException ie) {
// NOTE: Exception is deliberately swallowed to not affect overall clustering operation,
// since failing Z-index table will be attempted to be cleaned up upon subsequent
// since failing col-stats-index table will be attempted to be cleaned up upon subsequent
// clustering iteration
LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie);
LOG.warn(String.format("Failed to cleanup residual col-stats-index table: %s", f), ie);
}
});
} catch (IOException e) {
LOG.error("Failed to build new Z-index table", e);
throw new HoodieException("Failed to build new Z-index table", e);
LOG.error("Failed to build new col-stats-index table", e);
throw new HoodieException("Failed to build new col-stats-index table", e);
}
}
@@ -457,7 +352,7 @@ public class ZOrderingIndexHelper {
@Nonnull Dataset<Row> newIndexTableDf,
@Nonnull Dataset<Row> existingIndexTableDf
) {
// NOTE: If new Z-index table schema is incompatible with that one of existing table
// NOTE: If new col-stats index table schema is incompatible with that one of existing table
// that is most likely due to changing settings of list of Z-ordered columns, that
// occurred since last index table have been persisted.
//
@@ -503,27 +398,6 @@ public class ZOrderingIndexHelper {
return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty());
}
@Nullable
private static String mapToSourceTableColumnName(StructField fieldStruct) {
String name = fieldStruct.name();
int maxStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MAX_VALUE_STAT_NAME));
if (maxStatSuffixIdx != -1) {
return name.substring(0, maxStatSuffixIdx);
}
int minStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MIN_VALUE_STAT_NAME));
if (minStatSuffixIdx != -1) {
return name.substring(0, minStatSuffixIdx);
}
int numNullsSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_NUM_NULLS_STAT_NAME));
if (numNullsSuffixIdx != -1) {
return name.substring(0, numNullsSuffixIdx);
}
return null;
}
private static String composeZIndexColName(String col, String statName) {
// TODO add escaping for
return String.format("%s_%s", col, statName);
@@ -589,7 +463,7 @@ public class ZOrderingIndexHelper {
* @VisibleForTesting
*/
@Nonnull
public static String createIndexMergeSql(
static String createIndexMergeSql(
@Nonnull String originalIndexTable,
@Nonnull String newIndexTable,
@Nonnull List<String> columns

View File

@@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sort;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ByteArraySorting;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.davidmoten.hilbert.HilbertCurve;
import scala.collection.JavaConversions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class SpaceCurveSortingHelper {
private static final Logger LOG = LogManager.getLogger(SpaceCurveSortingHelper.class);
/**
* Orders provided {@link Dataset} by mapping values of the provided list of columns
* {@code orderByCols} onto a specified space curve (Z-curve, Hilbert, etc)
*
* <p/>
* NOTE: Only support base data-types: long,int,short,double,float,string,timestamp,decimal,date,byte.
* This method is more effective than {@link #orderDataFrameBySamplingValues} leveraging
* data sampling instead of direct mapping
*
* @param df Spark {@link Dataset} holding data to be ordered
* @param orderByCols list of columns to be ordered by
* @param targetPartitionCount target number of output partitions
* @param layoutOptStrategy target layout optimization strategy
* @return a {@link Dataset} holding data ordered by mapping tuple of values from provided columns
* onto a specified space-curve
*/
public static Dataset<Row> orderDataFrameByMappingValues(
Dataset<Row> df,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
List<String> orderByCols,
int targetPartitionCount
) {
Map<String, StructField> columnsMap =
Arrays.stream(df.schema().fields())
.collect(Collectors.toMap(StructField::name, Function.identity()));
List<String> checkCols =
orderByCols.stream()
.filter(columnsMap::containsKey)
.collect(Collectors.toList());
if (orderByCols.size() != checkCols.size()) {
LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));
return df;
}
// In case when there's just one column to be ordered by, we can skip space-curve
// ordering altogether (since it will match linear ordering anyway)
if (orderByCols.size() == 1) {
String orderByColName = orderByCols.get(0);
LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));
// TODO validate if we need Spark to re-partition
return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));
}
int fieldNum = df.schema().fields().length;
Map<Integer, StructField> fieldMap =
orderByCols.stream()
.collect(
Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));
JavaRDD<Row> sortedRDD;
switch (layoutOptStrategy) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", layoutOptStrategy));
}
// Compose new {@code StructType} for ordered RDDs
StructType newStructType = composeOrderedRDDStructType(df.schema());
return df.sparkSession()
.createDataFrame(sortedRDD, newStructType)
.drop("Index");
}
private static StructType composeOrderedRDDStructType(StructType schema) {
return StructType$.MODULE$.apply(
CollectionUtils.combine(
Arrays.asList(schema.fields()),
Arrays.asList(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()))
)
);
}
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return BinaryUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(BinaryUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
}
};
}).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
public static Dataset<Row> orderDataFrameBySamplingValues(
Dataset<Row> df,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
List<String> orderByCols,
int targetPartitionCount
) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, layoutOptStrategy, JavaConversions.asScalaBuffer(orderByCols), targetPartitionCount);
}
}

View File

@@ -47,7 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
@@ -172,18 +172,17 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
@Override
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) throws Exception {
// Updates Z-ordering Index
updateZIndex(context, stats, instantTime);
updateColumnsStatsIndex(context, stats, instantTime);
}
private void updateZIndex(
private void updateColumnsStatsIndex(
@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> updatedFilesStats,
@Nonnull String instantTime
) throws Exception {
String sortColsList = config.getClusteringSortColumns();
String basePath = metaClient.getBasePath();
String indexPath = metaClient.getZindexPath();
String indexPath = metaClient.getColumnStatsIndexPath();
List<String> completedCommits =
metaClient.getCommitsTimeline()
@@ -201,7 +200,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
return;
}
LOG.info(String.format("Updating Z-index table (%s)", indexPath));
LOG.info(String.format("Updating column-statistics index table (%s)", indexPath));
List<String> sortCols = Arrays.stream(sortColsList.split(","))
.map(String::trim)
@@ -209,13 +208,13 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context;
// Fetch table schema to appropriately construct Z-index schema
// Fetch table schema to appropriately construct col-stats index schema
Schema tableWriteSchema =
HoodieAvroUtils.createHoodieWriteSchema(
new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields()
);
ZOrderingIndexHelper.updateZIndexFor(
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
sparkEngineContext.getSqlContext().sparkSession(),
AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema),
touchedFiles,
@@ -225,7 +224,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
completedCommits
);
LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime));
LOG.info(String.format("Successfully updated column-statistics index at instant (%s)", instantTime));
}
@Override

View File

@@ -1,430 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import org.davidmoten.hilbert.HilbertCurve;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
public class OrderingIndexHelper {
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
/**
* Create optimized DataFrame directly
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createOptimizeDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param sortCols ordering columns for the curve
* @param fileNum spark partition num
* @param sortMode layout optimization strategy
* @return a dataFrame ordered by the curve.
*/
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (sortCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (sortCols.size() == 1) {
return df.repartitionByRange(fileNum, org.apache.spark.sql.functions.col(sortCols.get(0)));
}
Map<Integer, StructField> fieldMap = sortCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// do optimize
JavaRDD<Row> sortedRDD = null;
switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
}
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
}
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
}
};
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, String sortCols, int fileNum, String sortMode) {
if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizedDataFrameByMapValue(df,
Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum, String sortMode) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, String zCols, int fileNum, String sortMode) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
/**
* Parse min/max statistics stored in parquet footers for z-sort cols.
* no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType.
* to do adapt for rfc-27
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols
* @return a dataFrame holds all statistics info.
*/
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
List<String> scanFiles = Arrays.asList(df.inputFiles());
SparkContext sc = df.sparkSession().sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>();
while (paths.hasNext()) {
String path = paths.next();
results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
}
return results.stream().flatMap(f -> f.stream()).iterator();
}).collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
} else {
List<Object> rows = new ArrayList<>();
rows.add(f.get(0).getFilePath());
cols.stream().forEach(col -> {
HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
DataType colType = columnsMap.get(col);
if (currentColRangeMetaData == null || colType == null) {
throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
}
if (colType instanceof IntegerType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof DoubleType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
rows.add(currentColRangeMetaData.getMinValue().toString());
rows.add(currentColRangeMetaData.getMaxValue().toString());
} else if (colType instanceof DecimalType) {
rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ShortType) {
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof BinaryType) {
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ByteType) {
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
rows.add(currentColRangeMetaData.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
}
}).filter(f -> f != null);
List<StructField> allMetaDataSchema = new ArrayList<>();
allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty()));
cols.forEach(col -> {
allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty()));
});
return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
}
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
return getMinMaxValue(df, rawCols);
}
/**
* Update statistics info.
* this method will update old index table by full out join,
* and save the updated table into a new index table based on commitTime.
* old index table will be cleaned also.
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols.
* @param indexPath index store path.
* @param commitTime current operation commitTime.
* @param validateCommits all validate commits for current table.
* @return
*/
public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
Path savePath = new Path(indexPath, commitTime);
SparkSession spark = df.sparkSession();
FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
// If there's currently no index, create one
if (!fs.exists(new Path(indexPath))) {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
return;
}
// Otherwise, clean up all indexes but the most recent one
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} catch (IOException e) {
throw new HoodieException(e);
}
}
}

View File

@@ -18,8 +18,10 @@
package org.apache.spark.sql.hudi.execution
import org.apache.hudi.common.util.BinaryUtil
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil}
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
import org.apache.hudi.optimize.HilbertCurveUtils
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
@@ -235,15 +237,214 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S
}
}
case class ZorderingBinarySort(b: Array[Byte]) extends Ordered[ZorderingBinarySort] with Serializable {
override def compare(that: ZorderingBinarySort): Int = {
case class ByteArraySorting(b: Array[Byte]) extends Ordered[ByteArraySorting] with Serializable {
override def compare(that: ByteArraySorting): Int = {
val len = this.b.length
ZOrderingUtil.compareTo(this.b, 0, len, that.b, 0, len)
BinaryUtil.compareTo(this.b, 0, len, that.b, 0, len)
}
}
object RangeSampleSort {
/**
* create optimize DataFrame by sample
* first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame
* support all type data.
* this method need more resource and cost more time than createOptimizedDataFrameByMapValue
*/
def sortDataFrameBySample(df: DataFrame, layoutOptStrategy: LayoutOptimizationStrategy, orderByCols: Seq[String], targetPartitionsCount: Int): DataFrame = {
val spark = df.sparkSession
val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
val fieldNum = df.schema.fields.length
val checkCols = orderByCols.filter(col => columnsMap(col) != null)
if (orderByCols.isEmpty || checkCols.isEmpty) {
df
} else {
val zFields = orderByCols.map { col =>
val newCol = columnsMap(col)
if (newCol == null) {
(-1, null)
} else {
newCol.dataType match {
case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType =>
(df.schema.fields.indexOf(newCol), newCol)
case d: DecimalType =>
(df.schema.fields.indexOf(newCol), newCol)
case _ =>
(-1, null)
}
}
}.filter(_._1 != -1)
// Complex type found, use createZIndexedDataFrameByRange
if (zFields.length != orderByCols.length) {
return sortDataFrameBySampleSupportAllTypes(df, orderByCols, targetPartitionsCount)
}
val rawRdd = df.rdd
val sampleRdd = rawRdd.map { row =>
val values = zFields.map { case (index, field) =>
field.dataType match {
case LongType =>
if (row.isNullAt(index)) Long.MaxValue else row.getLong(index)
case DoubleType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index))
case IntegerType =>
if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong
case FloatType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble)
case StringType =>
if (row.isNullAt(index)) "" else row.getString(index)
case DateType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime
case TimestampType =>
if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime
case ByteType =>
if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong
case ShortType =>
if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong
case d: DecimalType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue()
case _ =>
null
}
}.filter(v => v != null).toArray
(values, null)
}
val zOrderBounds = df.sparkSession.sessionState.conf.getConfString(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key,
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt
val sample = new RangeSample(zOrderBounds, sampleRdd)
val rangeBounds = sample.getRangeBounds()
val sampleBounds = {
val candidateColNumber = rangeBounds.head._1.length
(0 to candidateColNumber - 1).map { i =>
val colRangeBound = rangeBounds.map(x => (x._1(i), x._2))
if (colRangeBound.head._1.isInstanceOf[String]) {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String])
} else {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long])
}
}
}
// expand bounds.
// maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength,
// however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number"
val maxLength = sampleBounds.map(_.length).max
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>
val fillFactor = maxLength / bound.size
val newBound = new Array[Double](bound.length * fillFactor)
if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) {
val longBound = bound.asInstanceOf[Array[Long]]
for (i <- 0 to bound.length - 1) {
for (j <- 0 to fillFactor - 1) {
// sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice
newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble)
}
}
(newBound, fillFactor)
} else {
(bound, 0)
}
}
val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = rawRdd.mapPartitions { iter =>
val expandBoundsWithFactor = boundBroadCast.value
val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
val longDecisionBound = new RawDecisionBound(Ordering[Long])
val doubleDecisionBound = new RawDecisionBound(Ordering[Double])
val stringDecisionBound = new RawDecisionBound(Ordering[String])
import java.util.concurrent.ThreadLocalRandom
val threadLocalRandom = ThreadLocalRandom.current
def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = {
val (expandBound, factor) = expandBoundsWithFactor(rawIndex)
if (isNull) {
expandBound.length + 1
} else {
if (factor > 1) {
doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]])
} else {
longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]])
}
}
}
val hilbertCurve = if (layoutOptStrategy == LayoutOptimizationStrategy.HILBERT)
Some(HilbertCurve.bits(32).dimensions(zFields.length))
else
None
iter.map { row =>
val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) =>
field.dataType match {
case LongType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull)
case DoubleType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull)
case IntegerType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull)
case FloatType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull)
case StringType =>
val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length
if (row.isNullAt(index)) {
maxBoundNum + 1
} else {
val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]])
if (factor > 1) {
(currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt)
} else {
currentRank
}
}
case DateType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull)
case TimestampType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull)
case ByteType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull)
case ShortType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull)
case d: DecimalType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull)
case _ =>
-1
}
}.filter(v => v != -1)
val mapValues = layoutOptStrategy match {
case LayoutOptimizationStrategy.HILBERT =>
HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32)
case LayoutOptimizationStrategy.ZORDER =>
BinaryUtil.interleaving(values.map(BinaryUtil.intTo8Byte(_)).toArray, 8)
}
Row.fromSeq(row.toSeq ++ Seq(mapValues))
}
}.sortBy(x => ByteArraySorting(x.getAs[Array[Byte]](fieldNum)), numPartitions = targetPartitionsCount)
val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
df.schema.fields ++ Seq(
StructField(s"index",
BinaryType, false))
))
newDF.drop("index")
}
}
/**
* create z-order DataFrame by sample
* support all col types
@@ -324,212 +525,15 @@ object RangeSampleSort {
decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]])
}
}
}.toArray.map(ZOrderingUtil.intTo8Byte(_))
val zValues = ZOrderingUtil.interleaving(interleaveValues, 8)
}.toArray.map(BinaryUtil.intTo8Byte(_))
val zValues = BinaryUtil.interleaving(interleaveValues, 8)
val mutablePair = new MutablePair[InternalRow, Array[Byte]]()
mutablePair.update(unsafeRow, zValues)
}
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)
}.sortBy(x => ByteArraySorting(x._2), numPartitions = fileNum).map(_._1)
spark.internalCreateDataFrame(indexRdd, schema)
}
}
/**
* create optimize DataFrame by sample
* first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame
* support all type data.
* this method need more resource and cost more time than createOptimizedDataFrameByMapValue
*/
def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, sortMode: String): DataFrame = {
val spark = df.sparkSession
val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
val fieldNum = df.schema.fields.length
val checkCols = zCols.filter(col => columnsMap(col) != null)
val useHilbert = sortMode match {
case "hilbert" => true
case "z-order" => false
case other => throw new IllegalArgumentException(s"new only support z-order/hilbert optimize but find: ${other}")
}
if (zCols.isEmpty || checkCols.isEmpty) {
df
} else {
val zFields = zCols.map { col =>
val newCol = columnsMap(col)
if (newCol == null) {
(-1, null)
} else {
newCol.dataType match {
case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType =>
(df.schema.fields.indexOf(newCol), newCol)
case d: DecimalType =>
(df.schema.fields.indexOf(newCol), newCol)
case _ =>
(-1, null)
}
}
}.filter(_._1 != -1)
// Complex type found, use createZIndexedDataFrameByRange
if (zFields.length != zCols.length) {
return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum)
}
val rawRdd = df.rdd
val sampleRdd = rawRdd.map { row =>
val values = zFields.map { case (index, field) =>
field.dataType match {
case LongType =>
if (row.isNullAt(index)) Long.MaxValue else row.getLong(index)
case DoubleType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index))
case IntegerType =>
if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong
case FloatType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble)
case StringType =>
if (row.isNullAt(index)) "" else row.getString(index)
case DateType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime
case TimestampType =>
if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime
case ByteType =>
if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong
case ShortType =>
if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong
case d: DecimalType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue()
case _ =>
null
}
}.filter(v => v != null).toArray
(values, null)
}
val zOrderBounds = df.sparkSession.sessionState.conf.getConfString(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key,
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt
val sample = new RangeSample(zOrderBounds, sampleRdd)
val rangeBounds = sample.getRangeBounds()
val sampleBounds = {
val candidateColNumber = rangeBounds.head._1.length
(0 to candidateColNumber - 1).map { i =>
val colRangeBound = rangeBounds.map(x => (x._1(i), x._2))
if (colRangeBound.head._1.isInstanceOf[String]) {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String])
} else {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long])
}
}
}
// expand bounds.
// maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength,
// however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number"
val maxLength = sampleBounds.map(_.length).max
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>
val fillFactor = maxLength / bound.size
val newBound = new Array[Double](bound.length * fillFactor)
if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) {
val longBound = bound.asInstanceOf[Array[Long]]
for (i <- 0 to bound.length - 1) {
for (j <- 0 to fillFactor - 1) {
// sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice
newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble)
}
}
(newBound, fillFactor)
} else {
(bound, 0)
}
}
val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = rawRdd.mapPartitions { iter =>
val hilbertCurve = if (useHilbert) Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None
val expandBoundsWithFactor = boundBroadCast.value
val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
val longDecisionBound = new RawDecisionBound(Ordering[Long])
val doubleDecisionBound = new RawDecisionBound(Ordering[Double])
val stringDecisionBound = new RawDecisionBound(Ordering[String])
import java.util.concurrent.ThreadLocalRandom
val threadLocalRandom = ThreadLocalRandom.current
def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = {
val (expandBound, factor) = expandBoundsWithFactor(rawIndex)
if (isNull) {
expandBound.length + 1
} else {
if (factor > 1) {
doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]])
} else {
longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]])
}
}
}
iter.map { row =>
val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) =>
field.dataType match {
case LongType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull)
case DoubleType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull)
case IntegerType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull)
case FloatType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull)
case StringType =>
val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length
if (row.isNullAt(index)) {
maxBoundNum + 1
} else {
val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]])
if (factor > 1) {
(currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt)
} else {
currentRank
}
}
case DateType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull)
case TimestampType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull)
case ByteType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull)
case ShortType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull)
case d: DecimalType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull)
case _ =>
-1
}
}.filter(v => v != -1)
val mapValues = if (hilbertCurve.isDefined) {
HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32)
} else {
ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8)
}
Row.fromSeq(row.toSeq ++ Seq(mapValues))
}
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum)
val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
df.schema.fields ++ Seq(
StructField(s"index",
BinaryType, false))
))
newDF.drop("index")
}
}
}