1
0

[HUDI-2814] Addressing issues w/ Z-order Layout Optimization (#4060)

* `ZCurveOptimizeHelper` > `ZOrderingIndexHelper`;
Moved Z-index helper under `hudi.index.zorder` package

* Tidying up `ZOrderingIndexHelper`

* Fixing compilation

* Fixed index new/original table merging sequence to always prefer values from new index;
Cleaned up `HoodieSparkUtils`

* Added test for `mergeIndexSql`

* Abstracted Z-index name composition w/in `ZOrderingIndexHelper`;

* Fixed `DataSkippingUtils` to interrupt prunning in case data filter contains non-indexed column reference

* Properly handle exceptions origination during pruning in `HoodieFileIndex`

* Make sure no errors are logged upon encountering `AnalysisException`

* Cleaned up Z-index updating sequence;
Tidying up comments, java-docs;

* Fixed Z-index to properly handle changes of the list of clustered columns

* Tidying up

* `lint`

* Suppressing `JavaDocStyle` first sentence check

* Fixed compilation

* Fixing incorrect `DecimalType` conversion

* Refactored test `TestTableLayoutOptimization`
  - Added Z-index table composition test (against fixtures)
  - Separated out GC test;
Tidying up

* Fixed tests re-shuffling column order for Z-Index table `DataFrame` to align w/ the one by one loaded from JSON

* Scaffolded `DataTypeUtils` to do basic checks of Spark types;
Added proper compatibility checking b/w old/new index-tables

* Added test for Z-index tables merging

* Fixed import being shaded by creating internal `hudi.util` package

* Fixed packaging for `TestOptimizeTable`

* Revised `updateMetadataIndex` seq to provide Z-index updating process w/ source table schema

* Make sure existing Z-index table schema is sync'd to source table's one

* Fixed shaded refs

* Fixed tests

* Fixed type conversion of Parquet provided metadata values into Spark expected schemas

* Fixed `composeIndexSchema` utility to propose proper schema

* Added more tests for Z-index:
  - Checking that Z-index table is built correctly
  - Checking that Z-index tables are merged correctly (during update)

* Fixing source table

* Fixing tests to read from Parquet w/ proper schema

* Refactored `ParquetUtils` utility reading stats from Parquet footers

* Fixed incorrect handling of Decimals extracted from Parquet footers

* Worked around issues in javac failign to compile stream's collection

* Fixed handling of `Date` type

* Fixed handling of `DateType` to be parsed as `LocalDate`

* Updated fixture;
Make sure test loads Z-index fixture using proper schema

* Removed superfluous scheme adjusting when reading from Parquet, since Spark is actually able to perfectly restore schema (given Parquet was previously written by Spark as well)

* Fixing race-condition in Parquet's `DateStringifier` trying to share `SimpleDataFormat` object which is inherently not thread-safe

* Tidying up

* Make sure schema is used upon reading to validate input files are in the appropriate format;
Tidying up;

* Worked around javac (1.8) inability to infer expression type properly

* Updated fixtures;
Tidying up

* Fixing compilation after rebase

* Assert clustering have in Z-order layout optimization testing

* Tidying up exception messages

* XXX

* Added test validating Z-index lookup filter correctness

* Added more test-cases;
Tidying up

* Added tests for string expressions

* Fixed incorrect Z-index filter lookup translations

* Added more test-cases

* Added proper handling on complex negations of AND/OR expressions by pushing NOT operator down into inner expressions for appropriate handling

* Added `-target:jvm-1.8` for `hudi-spark` module

* Adding more tests

* Added tests for non-indexed columns

* Properly handle non-indexed columns by falling back to a re-write of containing expression as  `TrueLiteral` instead

* Fixed tests

* Removing the parquet test files and disabling corresponding tests

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Alexey Kudinkin
2021-11-26 10:02:15 -08:00
committed by GitHub
parent 3d75aca40d
commit 5755ff25a4
28 changed files with 1955 additions and 932 deletions

View File

@@ -77,6 +77,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -245,14 +246,17 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);
/**
* update statistics info for current table.
* to do adaptation, once RFC-27 is finished.
* Updates Metadata Indexes (like Z-Index)
* TODO rebase onto metadata table (post RFC-27)
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param isOptimizeOperation whether current operation is OPTIMIZE type
* @param context instance of {@link HoodieEngineContext}
* @param instantTime instant of the carried operation triggering the update
*/
public abstract void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation);
public abstract void updateMetadataIndexes(
@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> stats,
@Nonnull String instantTime
) throws Exception;
public HoodieWriteConfig getConfig() {
return config;

View File

@@ -66,6 +66,7 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -244,7 +245,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}

View File

@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;
@@ -170,7 +171,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}

View File

@@ -396,15 +396,16 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
// try to save statistics info to hudi
if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) {
table.updateStatistics(context, writeStats, clusteringCommitTime, true);
// Update outstanding metadata indexes
if (config.isLayoutOptimizationEnabled()
&& !config.getClusteringSortColumns().isEmpty()) {
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);
}
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
} finally {
this.txnManager.endTransaction();

View File

@@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.ZCurveOptimizeHelper;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -79,10 +79,10 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPa
switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT:
zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups);
zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups);
break;
case SAMPLE:
zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups);
zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups);
break;
default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: ");

View File

@@ -0,0 +1,619 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.index.zorder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import scala.collection.JavaConversions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.hudi.util.DataTypeUtils.areCompatible;
public class ZOrderingIndexHelper {
private static final Logger LOG = LogManager.getLogger(ZOrderingIndexHelper.class);
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
private static final String Z_INDEX_FILE_COLUMN_NAME = "file";
private static final String Z_INDEX_MIN_VALUE_STAT_NAME = "minValue";
private static final String Z_INDEX_MAX_VALUE_STAT_NAME = "maxValue";
private static final String Z_INDEX_NUM_NULLS_STAT_NAME = "num_nulls";
public static String getMinColumnNameFor(String colName) {
return composeZIndexColName(colName, Z_INDEX_MIN_VALUE_STAT_NAME);
}
public static String getMaxColumnNameFor(String colName) {
return composeZIndexColName(colName, Z_INDEX_MAX_VALUE_STAT_NAME);
}
public static String getNumNullsColumnNameFor(String colName) {
return composeZIndexColName(colName, Z_INDEX_NUM_NULLS_STAT_NAME);
}
/**
* Create z-order DataFrame directly
* first, map all base type data to byte[8], then create z-order DataFrame
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createZIndexDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param zCols z-sort cols
* @param fileNum spark partition num
* @return a dataFrame sorted by z-order.
*/
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, List<String> zCols, int fileNum) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (zCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (zCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0)));
}
Map<Integer, StructField> fieldMap = zCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// z-sort
JavaRDD<Row> sortedRdd = df.toJavaRDD().map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex");
}
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameByMapValue(df,
Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
/**
* Parse min/max statistics from Parquet footers for provided columns and composes Z-index
* table in the following format with 3 statistics denominated for each Z-ordered column.
* For ex, if original table contained Z-ordered column {@code A}:
*
* <pre>
* +---------------------------+------------+------------+-------------+
* | file | A_minValue | A_maxValue | A_num_nulls |
* +---------------------------+------------+------------+-------------+
* | one_base_file.parquet | 1 | 10 | 0 |
* | another_base_file.parquet | -10 | 0 | 5 |
* +---------------------------+------------+------------+-------------+
* </pre>
*
* NOTE: Currently {@link TimestampType} is not supported, since Parquet writer
* does not support statistics for it.
*
* TODO leverage metadata table after RFC-27 lands
* @VisibleForTesting
*
* @param sparkSession encompassing Spark session
* @param baseFilesPaths list of base-files paths to be sourced for Z-index
* @param zorderedColumnSchemas target Z-ordered columns
* @return Spark's {@link Dataset} holding an index table
*/
@Nonnull
public static Dataset<Row> buildZIndexTableFor(
@Nonnull SparkSession sparkSession,
@Nonnull List<String> baseFilesPaths,
@Nonnull List<StructField> zorderedColumnSchemas
) {
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (baseFilesPaths.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos =
jsc.parallelize(baseFilesPaths, numParallelism)
.mapPartitions(paths -> {
ParquetUtils utils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
Iterable<String> iterable = () -> paths;
return StreamSupport.stream(iterable.spliterator(), false)
.flatMap(path ->
utils.readRangeFromParquetMetadata(
serializableConfiguration.value(),
new Path(path),
zorderedColumnSchemas.stream()
.map(StructField::name)
.collect(Collectors.toList())
)
.stream()
)
.iterator();
})
.collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
// Group column's metadata by file-paths of the files it belongs to
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> filePathToColumnMetadataMap =
colMinMaxInfos.stream()
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getFilePath));
JavaRDD<Row> allMetaDataRDD =
jsc.parallelize(new ArrayList<>(filePathToColumnMetadataMap.values()), 1)
.map(fileColumnsMetadata -> {
int colSize = fileColumnsMetadata.size();
if (colSize == 0) {
return null;
}
String filePath = fileColumnsMetadata.get(0).getFilePath();
List<Object> indexRow = new ArrayList<>();
// First columns of the Z-index's row is target file-path
indexRow.add(filePath);
// For each column
zorderedColumnSchemas.forEach(colSchema -> {
String colName = colSchema.name();
HoodieColumnRangeMetadata<Comparable> colMetadata =
fileColumnsMetadata.stream()
.filter(s -> s.getColumnName().trim().equalsIgnoreCase(colName))
.findFirst()
.orElse(null);
DataType colType = colSchema.dataType();
if (colMetadata == null || colType == null) {
throw new HoodieException(String.format("Cannot collect min/max statistics for column (%s)", colSchema));
}
Pair<Object, Object> minMaxValue = fetchMinMaxValues(colType, colMetadata);
indexRow.add(minMaxValue.getLeft()); // min
indexRow.add(minMaxValue.getRight()); // max
indexRow.add(colMetadata.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(indexRow));
})
.filter(Objects::nonNull);
StructType indexSchema = composeIndexSchema(zorderedColumnSchemas);
return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
}
/**
* <p/>
* Updates state of the Z-index by:
* <ol>
* <li>Updating Z-index with statistics for {@code sourceBaseFiles}, collecting corresponding
* column statistics from Parquet footers</li>
* <li>Merging newly built Z-index table with the most recent one (if present and not preempted)</li>
* <li>Cleans up any residual index tables, that weren't cleaned up before</li>
* </ol>
*
* @param sparkSession encompassing Spark session
* @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema
* @param sourceBaseFiles list of base-files to be indexed
* @param zorderedCols target Z-ordered columns
* @param zindexFolderPath Z-index folder path
* @param commitTime current operation commit instant
* @param completedCommits all previously completed commit instants
*/
public static void updateZIndexFor(
@Nonnull SparkSession sparkSession,
@Nonnull StructType sourceTableSchema,
@Nonnull List<String> sourceBaseFiles,
@Nonnull List<String> zorderedCols,
@Nonnull String zindexFolderPath,
@Nonnull String commitTime,
@Nonnull List<String> completedCommits
) {
FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration());
// Compose new Z-index table for the given source base files
Dataset<Row> newZIndexDf =
buildZIndexTableFor(
sparkSession,
sourceBaseFiles,
zorderedCols.stream()
.map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)])
.collect(Collectors.toList())
);
try {
//
// Z-Index has the following folder structure:
//
// .hoodie/
// ├── .zindex/
// │ ├── <instant>/
// │ │ ├── <part-...>.parquet
// │ │ └── ...
//
// If index is currently empty (no persisted tables), we simply create one
// using clustering operation's commit instance as it's name
Path newIndexTablePath = new Path(zindexFolderPath, commitTime);
if (!fs.exists(new Path(zindexFolderPath))) {
newZIndexDf.repartition(1)
.write()
.format("parquet")
.mode("overwrite")
.save(newIndexTablePath.toString());
return;
}
// Filter in all index tables (w/in {@code .zindex} folder)
List<String> allIndexTables =
Arrays.stream(
fs.listStatus(new Path(zindexFolderPath))
)
.filter(FileStatus::isDirectory)
.map(f -> f.getPath().getName())
.collect(Collectors.toList());
// Compile list of valid index tables that were produced as part
// of previously successfully committed iterations
List<String> validIndexTables =
allIndexTables.stream()
.filter(completedCommits::contains)
.sorted()
.collect(Collectors.toList());
List<String> tablesToCleanup =
allIndexTables.stream()
.filter(f -> !completedCommits.contains(f))
.collect(Collectors.toList());
Dataset<Row> finalZIndexDf;
// Before writing out new version of the Z-index table we need to merge it
// with the most recent one that were successfully persisted previously
if (validIndexTables.isEmpty()) {
finalZIndexDf = newZIndexDf;
} else {
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
// prior to merging, since merging might fail otherwise due to schemas incompatibility
finalZIndexDf =
tryMergeMostRecentIndexTableInto(
sparkSession,
newZIndexDf,
// Load current most recent Z-index table
sparkSession.read().load(
new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
)
);
// Clean up all index tables (after creation of the new index)
tablesToCleanup.addAll(validIndexTables);
}
// Persist new Z-index table
finalZIndexDf
.repartition(1)
.write()
.format("parquet")
.save(newIndexTablePath.toString());
// Clean up residual Z-index tables that have might have been dangling since
// previous iterations (due to intermittent failures during previous clean up)
tablesToCleanup.forEach(f -> {
try {
fs.delete(new Path(zindexFolderPath, f), true);
} catch (IOException ie) {
// NOTE: Exception is deliberately swallowed to not affect overall clustering operation,
// since failing Z-index table will be attempted to be cleaned up upon subsequent
// clustering iteration
LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie);
}
});
} catch (IOException e) {
LOG.error("Failed to build new Z-index table", e);
throw new HoodieException("Failed to build new Z-index table", e);
}
}
@Nonnull
private static Dataset<Row> tryMergeMostRecentIndexTableInto(
@Nonnull SparkSession sparkSession,
@Nonnull Dataset<Row> newIndexTableDf,
@Nonnull Dataset<Row> existingIndexTableDf
) {
// NOTE: If new Z-index table schema is incompatible with that one of existing table
// that is most likely due to changing settings of list of Z-ordered columns, that
// occurred since last index table have been persisted.
//
// In that case, we simply drop existing index table and just persist the new one;
//
// Also note that we're checking compatibility of _old_ index-table with new one and that
// COMPATIBILITY OPERATION DOES NOT COMMUTE (ie if A is compatible w/ B,
// B might not necessarily be compatible w/ A)
if (!areCompatible(existingIndexTableDf.schema(), newIndexTableDf.schema())) {
return newIndexTableDf;
}
String randomSuffix = UUID.randomUUID().toString().replace("-", "");
String existingIndexTempTableName = "existingIndexTable_" + randomSuffix;
String newIndexTempTableName = "newIndexTable_" + randomSuffix;
existingIndexTableDf.registerTempTable(existingIndexTempTableName);
newIndexTableDf.registerTempTable(newIndexTempTableName);
List<String> newTableColumns = Arrays.asList(newIndexTableDf.schema().fieldNames());
// Create merged table by doing full-out join
return sparkSession.sql(createIndexMergeSql(existingIndexTempTableName, newIndexTempTableName, newTableColumns));
}
/**
* @VisibleForTesting
*/
@Nonnull
public static StructType composeIndexSchema(@Nonnull List<StructField> zorderedColumnsSchemas) {
List<StructField> schema = new ArrayList<>();
schema.add(new StructField(Z_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty()));
zorderedColumnsSchemas.forEach(colSchema -> {
schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType()));
schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType()));
schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$));
});
return StructType$.MODULE$.apply(schema);
}
private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) {
return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty());
}
@Nullable
private static String mapToSourceTableColumnName(StructField fieldStruct) {
String name = fieldStruct.name();
int maxStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MAX_VALUE_STAT_NAME));
if (maxStatSuffixIdx != -1) {
return name.substring(0, maxStatSuffixIdx);
}
int minStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MIN_VALUE_STAT_NAME));
if (minStatSuffixIdx != -1) {
return name.substring(0, minStatSuffixIdx);
}
int numNullsSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_NUM_NULLS_STAT_NAME));
if (numNullsSuffixIdx != -1) {
return name.substring(0, numNullsSuffixIdx);
}
return null;
}
private static String composeZIndexColName(String col, String statName) {
// TODO add escaping for
return String.format("%s_%s", col, statName);
}
private static Pair<Object, Object>
fetchMinMaxValues(
@Nonnull DataType colType,
@Nonnull HoodieColumnRangeMetadata<Comparable> colMetadata) {
if (colType instanceof IntegerType) {
return Pair.of(
new Integer(colMetadata.getMinValue().toString()),
new Integer(colMetadata.getMaxValue().toString())
);
} else if (colType instanceof DoubleType) {
return Pair.of(
new Double(colMetadata.getMinValue().toString()),
new Double(colMetadata.getMaxValue().toString())
);
} else if (colType instanceof StringType) {
return Pair.of(
new String(((Binary) colMetadata.getMinValue()).getBytes()),
new String(((Binary) colMetadata.getMaxValue()).getBytes())
);
} else if (colType instanceof DecimalType) {
return Pair.of(
new BigDecimal(colMetadata.getMinValue().toString()),
new BigDecimal(colMetadata.getMaxValue().toString()));
} else if (colType instanceof DateType) {
return Pair.of(
java.sql.Date.valueOf(colMetadata.getMinValue().toString()),
java.sql.Date.valueOf(colMetadata.getMaxValue().toString()));
} else if (colType instanceof LongType) {
return Pair.of(
new Long(colMetadata.getMinValue().toString()),
new Long(colMetadata.getMaxValue().toString()));
} else if (colType instanceof ShortType) {
return Pair.of(
new Short(colMetadata.getMinValue().toString()),
new Short(colMetadata.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
return Pair.of(
new Float(colMetadata.getMinValue().toString()),
new Float(colMetadata.getMaxValue().toString()));
} else if (colType instanceof BinaryType) {
return Pair.of(
((Binary) colMetadata.getMinValue()).getBytes(),
((Binary) colMetadata.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
return Pair.of(
Boolean.valueOf(colMetadata.getMinValue().toString()),
Boolean.valueOf(colMetadata.getMaxValue().toString()));
} else if (colType instanceof ByteType) {
return Pair.of(
Byte.valueOf(colMetadata.getMinValue().toString()),
Byte.valueOf(colMetadata.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
}
/**
* @VisibleForTesting
*/
@Nonnull
static String createIndexMergeSql(
@Nonnull String originalIndexTable,
@Nonnull String newIndexTable,
@Nonnull List<String> columns
) {
StringBuilder selectBody = new StringBuilder();
for (int i = 0; i < columns.size(); ++i) {
String col = columns.get(i);
String originalTableColumn = String.format("%s.%s", originalIndexTable, col);
String newTableColumn = String.format("%s.%s", newIndexTable, col);
selectBody.append(
// NOTE: We prefer values from the new index table, and fallback to the original one only
// in case it does not contain statistics for the given file path
String.format("if (%s is null, %s, %s) AS %s", newTableColumn, originalTableColumn, newTableColumn, col)
);
if (i < columns.size() - 1) {
selectBody.append(", ");
}
}
return String.format(
"SELECT %s FROM %s FULL JOIN %s ON %s = %s",
selectBody,
originalIndexTable,
newIndexTable,
String.format("%s.%s", originalIndexTable, columns.get(0)),
String.format("%s.%s", newIndexTable, columns.get(0))
);
}
}

View File

@@ -18,7 +18,10 @@
package org.apache.hudi.table;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -37,9 +40,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -73,11 +78,12 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.ZCurveOptimizeHelper;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import scala.collection.JavaConversions;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -163,29 +169,61 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
// deal with z-order/hilbert statistic info
if (isOptimizeOperation) {
updateOptimizeOperationStatistics(context, stats, instantTime);
}
public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List<HoodieWriteStat> stats, @Nonnull String instantTime) throws Exception {
// Updates Z-ordering Index
updateZIndex(context, stats, instantTime);
}
private void updateOptimizeOperationStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime) {
String cols = config.getClusteringSortColumns();
private void updateZIndex(
@Nonnull HoodieEngineContext context,
@Nonnull List<HoodieWriteStat> updatedFilesStats,
@Nonnull String instantTime
) throws Exception {
String sortColsList = config.getClusteringSortColumns();
String basePath = metaClient.getBasePath();
String indexPath = metaClient.getZindexPath();
List<String> validateCommits = metaClient.getCommitsTimeline()
.filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList());
List<String> touchFiles = stats.stream().map(s -> new Path(basePath, s.getPath()).toString()).collect(Collectors.toList());
if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
LOG.warn("save nothing to index table");
List<String> completedCommits =
metaClient.getCommitsTimeline()
.filterCompletedInstants()
.getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
List<String> touchedFiles =
updatedFilesStats.stream()
.map(s -> new Path(basePath, s.getPath()).toString())
.collect(Collectors.toList());
if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || StringUtils.isNullOrEmpty(indexPath)) {
return;
}
LOG.info(String.format("Updating Z-index table (%s)", indexPath));
List<String> sortCols = Arrays.stream(sortColsList.split(","))
.map(String::trim)
.collect(Collectors.toList());
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context;
ZCurveOptimizeHelper.saveStatisticsInfo(sparkEngineContext
.getSqlContext().sparkSession().read().load(JavaConversions.asScalaBuffer(touchFiles)),
cols, indexPath, instantTime, validateCommits);
LOG.info(String.format("save statistic info sucessfully at commitTime: %s", instantTime));
// Fetch table schema to appropriately construct Z-index schema
Schema tableWriteSchema =
HoodieAvroUtils.createHoodieWriteSchema(
new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields()
);
ZOrderingIndexHelper.updateZIndexFor(
sparkEngineContext.getSqlContext().sparkSession(),
AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema),
touchedFiles,
sortCols,
indexPath,
instantTime,
completedCommits
);
LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime));
}
@Override

View File

@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.spark.sql.types.ByteType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.FloatType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.ShortType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.VarcharType$;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class DataTypeUtils {
private static Map<Class<?>, Set<Class<?>>> sparkPrimitiveTypesCompatibilityMap =
new HashMap<Class<?>, Set<Class<?>>>() {{
// Integral types
put(ShortType$.class,
newHashSet(ByteType$.class, ShortType$.class));
put(IntegerType$.class,
newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class));
put(LongType$.class,
newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class, LongType$.class));
// Float types
put(DoubleType$.class,
newHashSet(FloatType$.class, DoubleType$.class));
// String types
put(StringType$.class,
newHashSet(VarcharType$.class, StringType$.class));
}};
/**
* Validates whether one {@link StructType} is compatible w/ the other one.
* Compatibility rules are defined like following: types A and B are considered
* compatible iff
*
* <ol>
* <li>A and B are identical</li>
* <li>All values comprising A domain are contained w/in B domain (for ex, {@code ShortType}
* in this sense is compatible w/ {@code IntegerType})</li>
* </ol>
*
* @param left operand
* @param right operand
* @return true if {@code left} instance of {@link StructType} is compatible w/ the {@code right}
*/
public static boolean areCompatible(@Nonnull DataType left, @Nonnull DataType right) {
// First, check if types are equal
if (Objects.equals(left, right)) {
return true;
}
// If not, check whether both are instances of {@code StructType} that
// should be matched structurally
if (left instanceof StructType && right instanceof StructType) {
return areCompatible((StructType) left, (StructType) right);
}
// If not, simply check if those data-types constitute compatibility
// relationship outlined above; otherwise return false
return sparkPrimitiveTypesCompatibilityMap.getOrDefault(left.getClass(), Collections.emptySet())
.contains(right.getClass());
}
private static boolean areCompatible(@Nonnull StructType left, @Nonnull StructType right) {
StructField[] oneSchemaFields = left.fields();
StructField[] anotherSchemaFields = right.fields();
if (oneSchemaFields.length != anotherSchemaFields.length) {
return false;
}
for (int i = 0; i < oneSchemaFields.length; ++i) {
StructField oneField = oneSchemaFields[i];
StructField anotherField = anotherSchemaFields[i];
// NOTE: Metadata is deliberately omitted from comparison
if (!Objects.equals(oneField.name(), anotherField.name())
|| !areCompatible(oneField.dataType(), anotherField.dataType())
|| oneField.nullable() != anotherField.nullable()) {
return false;
}
}
return true;
}
private static <T> HashSet<T> newHashSet(T... ts) {
return new HashSet<>(Arrays.asList(ts));
}
}

View File

@@ -1,356 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import scala.collection.JavaConversions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ZCurveOptimizeHelper {
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
/**
* Create z-order DataFrame directly
* first, map all base type data to byte[8], then create z-order DataFrame
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createZIndexDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param zCols z-sort cols
* @param fileNum spark partition num
* @return a dataFrame sorted by z-order.
*/
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, List<String> zCols, int fileNum) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (zCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (zCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0)));
}
Map<Integer, StructField> fieldMap = zCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// z-sort
JavaRDD<Row> sortedRdd = df.toJavaRDD().map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex");
}
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameByMapValue(df,
Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
/**
* Parse min/max statistics stored in parquet footers for z-sort cols.
* no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType.
* to do adapt for rfc-27
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols
* @return a dataFrame holds all statistics info.
*/
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
List<String> scanFiles = Arrays.asList(df.inputFiles());
SparkContext sc = df.sparkSession().sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>();
while (paths.hasNext()) {
String path = paths.next();
results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
}
return results.stream().flatMap(f -> f.stream()).iterator();
}).collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
} else {
List<Object> rows = new ArrayList<>();
rows.add(f.get(0).getFilePath());
cols.stream().forEach(col -> {
HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
DataType colType = columnsMap.get(col);
if (currentColRangeMetaData == null || colType == null) {
throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
}
if (colType instanceof IntegerType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof DoubleType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
rows.add(currentColRangeMetaData.getMinValueAsString());
rows.add(currentColRangeMetaData.getMaxValueAsString());
} else if (colType instanceof DecimalType) {
rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ShortType) {
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof BinaryType) {
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ByteType) {
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
rows.add(currentColRangeMetaData.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
}
}).filter(f -> f != null);
List<StructField> allMetaDataSchema = new ArrayList<>();
allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty()));
cols.forEach(col -> {
allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty()));
});
return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
}
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
return getMinMaxValue(df, rawCols);
}
/**
* Update statistics info.
* this method will update old index table by full out join,
* and save the updated table into a new index table based on commitTime.
* old index table will be cleaned also.
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols.
* @param indexPath index store path.
* @param commitTime current operation commitTime.
* @param validateCommits all validate commits for current table.
* @return
*/
public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
Path savePath = new Path(indexPath, commitTime);
SparkSession spark = df.sparkSession();
FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
// If there's currently no index, create one
if (!fs.exists(new Path(indexPath))) {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
return;
}
// Otherwise, clean up all indexes but the most recent one
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} catch (IOException e) {
throw new HoodieException(e);
}
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi
import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
@@ -318,7 +317,7 @@ object AvroConversionHelper {
} else {
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
val targetList = new util.ArrayList[Any](sourceArraySize)
val targetList = new java.util.ArrayList[Any](sourceArraySize)
var idx = 0
while (idx < sourceArraySize) {
targetList.add(elementConverter(sourceArray(idx)))
@@ -336,7 +335,7 @@ object AvroConversionHelper {
if (item == null) {
null
} else {
val javaMap = new util.HashMap[String, Any]()
val javaMap = new java.util.HashMap[String, Any]()
item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
javaMap.put(key, valueConverter(value))
}

View File

@@ -287,43 +287,4 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}
/**
* Create merge sql to merge leftTable and right table.
*
* @param leftTable table name.
* @param rightTable table name.
* @param cols merged cols.
* @return merge sql.
*/
def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = {
var selectsql = ""
for (i <- cols.indices) {
selectsql = selectsql + s" if (${leftTable}.${cols(i)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
}
"select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}"
}
/**
* Collect min/max statistics for candidate cols.
* support all col types.
*
* @param df dataFrame holds read files.
* @param cols candidate cols to collect statistics.
* @return
*/
def getMinMaxValueSpark(df: DataFrame, cols: Seq[String]): DataFrame = {
val sqlContext = df.sparkSession.sqlContext
import sqlContext.implicits._
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))
val result = df.select(input_file_name() as "file", col("*"))
.groupBy($"file")
.agg(valueCounts, values: _*).select(projectValues:_*)
result
}
}

View File

@@ -18,8 +18,6 @@
package org.apache.spark.sql.hudi.execution
import java.util
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
@@ -197,22 +195,22 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S
// For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator.
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
case ClassTag.Double =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double])
case ClassTag.Byte =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte])
case ClassTag.Char =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char])
case ClassTag.Short =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short])
case ClassTag.Int =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int])
case ClassTag.Long =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
case _ =>
val comparator = ordering.asInstanceOf[java.util.Comparator[Any]]
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
(l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.zorder;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestZOrderingIndexHelper {
@Test
public void testMergeSql() {
String q = ZOrderingIndexHelper.createIndexMergeSql("old", "new", Arrays.asList("file", "a", "b"));
assertEquals(
"SELECT "
+ "if (new.file is null, old.file, new.file) AS file, "
+ "if (new.a is null, old.a, new.a) AS a, "
+ "if (new.b is null, old.b, new.b) AS b "
+ "FROM old FULL JOIN new ON old.file = new.file", q);
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.common.model;
import org.apache.parquet.schema.PrimitiveStringifier;
import java.util.Objects;
/**
@@ -28,28 +30,16 @@ public class HoodieColumnRangeMetadata<T> {
private final String columnName;
private final T minValue;
private final T maxValue;
private long numNulls;
// For Decimal Type/Date Type, minValue/maxValue cannot represent it's original value.
// eg: when parquet collects column information, the decimal type is collected as int/binary type.
// so we cannot use minValue and maxValue directly, use minValueAsString/maxValueAsString instead.
private final String minValueAsString;
private final String maxValueAsString;
private final long numNulls;
private final PrimitiveStringifier stringifier;
public HoodieColumnRangeMetadata(
final String filePath,
final String columnName,
final T minValue,
final T maxValue,
long numNulls,
final String minValueAsString,
final String maxValueAsString) {
public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
this.maxValue = maxValue;
this.numNulls = numNulls == -1 ? 0 : numNulls;
this.minValueAsString = minValueAsString;
this.maxValueAsString = maxValueAsString;
this.numNulls = numNulls;
this.stringifier = stringifier;
}
public String getFilePath() {
@@ -68,12 +58,8 @@ public class HoodieColumnRangeMetadata<T> {
return this.maxValue;
}
public String getMaxValueAsString() {
return maxValueAsString;
}
public String getMinValueAsString() {
return minValueAsString;
public PrimitiveStringifier getStringifier() {
return stringifier;
}
public long getNumNulls() {
@@ -93,14 +79,12 @@ public class HoodieColumnRangeMetadata<T> {
&& Objects.equals(getColumnName(), that.getColumnName())
&& Objects.equals(getMinValue(), that.getMinValue())
&& Objects.equals(getMaxValue(), that.getMaxValue())
&& Objects.equals(getNumNulls(), that.getNumNulls())
&& Objects.equals(getMinValueAsString(), that.getMinValueAsString())
&& Objects.equals(getMaxValueAsString(), that.getMaxValueAsString());
&& Objects.equals(getNumNulls(), that.getNumNulls());
}
@Override
public int hashCode() {
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls(), getMinValueAsString(), getMaxValueAsString());
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls());
}
@Override
@@ -110,8 +94,6 @@ public class HoodieColumnRangeMetadata<T> {
+ "columnName='" + columnName + '\''
+ ", minValue=" + minValue
+ ", maxValue=" + maxValue
+ ", numNulls=" + numNulls
+ ", minValueAsString=" + minValueAsString
+ ", minValueAsString=" + maxValueAsString + '}';
+ ", numNulls=" + numNulls + '}';
}
}

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.common.util;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -26,11 +30,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -38,12 +37,15 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -51,14 +53,13 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Utility functions involving with parquet.
*/
public class ParquetUtils extends BaseFileUtils {
private static Object lock = new Object();
/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
@@ -286,95 +287,97 @@ public class ParquetUtils extends BaseFileUtils {
/**
* Parse min/max statistics stored in parquet footers for all columns.
* ParquetRead.readFooter is not a thread safe method.
*
* @param conf hadoop conf.
* @param parquetFilePath file to be read.
* @param cols cols which need to collect statistics.
* @return a HoodieColumnRangeMetadata instance.
*/
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
Configuration conf,
Path parquetFilePath,
List<String> cols) {
public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
@Nonnull Configuration conf,
@Nonnull Path parquetFilePath,
@Nonnull List<String> cols
) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
// collect stats from all parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
String minAsString;
String maxAsString;
if (columnChunkMetaData.getPrimitiveType().getOriginalType() == OriginalType.DATE) {
synchronized (lock) {
minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
} else {
minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
minAsString, maxAsString);
});
}).collect(Collectors.groupingBy(e -> e.getColumnName()));
// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData -> blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<Comparable>(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
columnChunkMetaData.getPrimitiveType(),
columnChunkMetaData.getStatistics().genericGetMin()),
convertToNativeJavaType(
columnChunkMetaData.getPrimitiveType(),
columnChunkMetaData.getStatistics().genericGetMax()),
columnChunkMetaData.getStatistics().getNumNulls(),
columnChunkMetaData.getPrimitiveType().stringifier()))
).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));
// we only intend to keep file level statistics.
return new ArrayList<>(columnToStatsListMap.values().stream()
.map(blocks -> getColumnRangeInFile(blocks))
.collect(Collectors.toList()));
// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
// expression type correctly)
Stream<HoodieColumnRangeMetadata<Comparable>> stream = columnToStatsListMap.values()
.stream()
.map(this::getColumnRangeInFile);
return stream.collect(Collectors.toList());
}
private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRangeInFile(
@Nonnull List<HoodieColumnRangeMetadata<T>> blockRanges
) {
if (blockRanges.size() == 1) {
// only one block in parquet file. we can just return that range.
return blockRanges.get(0);
} else {
// there are multiple blocks. Compute min(block_mins) and max(block_maxs)
return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, b2)).get();
}
// there are multiple blocks. Compute min(block_mins) and max(block_maxs)
return blockRanges.stream()
.sequential()
.reduce(this::combineRanges).get();
}
private HoodieColumnRangeMetadata<Comparable> combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
HoodieColumnRangeMetadata<Comparable> range2) {
final Comparable minValue;
final Comparable maxValue;
final String minValueAsString;
final String maxValueAsString;
if (range1.getMinValue() != null && range2.getMinValue() != null) {
if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
} else {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
}
} else if (range1.getMinValue() == null) {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
HoodieColumnRangeMetadata<T> one,
HoodieColumnRangeMetadata<T> another
) {
final T minValue;
final T maxValue;
if (one.getMinValue() != null && another.getMinValue() != null) {
minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? one.getMinValue() : another.getMinValue();
} else if (one.getMinValue() == null) {
minValue = another.getMinValue();
} else {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
minValue = one.getMinValue();
}
if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
}
} else if (range1.getMaxValue() == null) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
if (one.getMaxValue() != null && another.getMaxValue() != null) {
maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? another.getMaxValue() : one.getMaxValue();
} else if (one.getMaxValue() == null) {
maxValue = another.getMaxValue();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
maxValue = one.getMaxValue();
}
return new HoodieColumnRangeMetadata<>(range1.getFilePath(),
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), minValueAsString, maxValueAsString);
return new HoodieColumnRangeMetadata<T>(
one.getFilePath(),
one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls(), one.getStringifier());
}
private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata();
return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale());
} else if (primitiveType.getOriginalType() == OriginalType.DATE) {
// NOTE: This is a workaround to address race-condition in using
// {@code SimpleDataFormat} concurrently (w/in {@code DateStringifier})
// TODO cleanup after Parquet upgrade to 1.12
synchronized (primitiveType.stringifier()) {
return java.sql.Date.valueOf(
primitiveType.stringifier().stringify((Integer) val)
);
}
}
return val;
}
}

View File

@@ -48,6 +48,7 @@
<configuration>
<args>
<arg>-nobootcp</arg>
<arg>-target:jvm-1.8</arg>
</args>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
</configuration>

View File

@@ -30,21 +30,24 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils}
import org.apache.spark.sql.hudi.DataSkippingUtils.createZIndexLookupFilter
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -169,16 +172,16 @@ case class HoodieFileIndex(
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* @param dataFilters list of original data filters passed down from querying engine
* @param queryFilters list of original data filters passed down from querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]): Option[Set[String]] = {
private def lookupCandidateFilesInZIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
val indexPath = metaClient.getZindexPath
val fs = metaClient.getFs
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || dataFilters.isEmpty) {
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) {
// scalastyle:off return
return Option.empty
return Success(Option.empty)
// scalastyle:on return
}
@@ -192,7 +195,7 @@ case class HoodieFileIndex(
if (candidateIndexTables.isEmpty) {
// scalastyle:off return
return Option.empty
return Success(Option.empty)
// scalastyle:on return
}
@@ -207,7 +210,7 @@ case class HoodieFileIndex(
dataFrameOpt.map(df => {
val indexSchema = df.schema
val indexFilter =
dataFilters.map(DataSkippingUtils.createZIndexLookupFilter(_, indexSchema))
queryFilters.map(createZIndexLookupFilter(_, indexSchema))
.reduce(And)
logInfo(s"Index filter condition: $indexFilter")
@@ -221,7 +224,7 @@ case class HoodieFileIndex(
.toSet
val prunedCandidateFileNames =
df.filter(new Column(indexFilter))
df.where(new Column(indexFilter))
.select("file")
.collect()
.map(_.getString(0))
@@ -261,11 +264,22 @@ case class HoodieFileIndex(
// - Data-skipping is enabled
// - Z-index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesNamesInZIndex(dataFilters)
val candidateFilesNamesOpt: Option[Set[String]] =
lookupCandidateFilesInZIndex(dataFilters) match {
case Success(opt) => opt
case Failure(e) =>
if (e.isInstanceOf[AnalysisException]) {
logDebug("Failed to relay provided data filters to Z-index lookup", e)
} else {
logError("Failed to lookup candidate files in Z-index", e)
}
Option.empty
}
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
if (queryAsNonePartitionedTable) {
// Read as Non-Partitioned table
// Filter in candidate files based on the Z-index lookup
val candidateFiles =
allFiles.filter(fileStatus =>
@@ -273,9 +287,10 @@ case class HoodieFileIndex(
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)
logInfo(s"Total files : ${allFiles.size}," +
s" candidate files after data skipping: ${candidateFiles.size} " +
s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
logInfo(s"Total files : ${allFiles.size}; " +
s"candidate files after data skipping: ${candidateFiles.size}; " +
s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
@@ -284,27 +299,27 @@ case class HoodieFileIndex(
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileStatus
} else {
null
}
}).filterNot(_ == null)
val baseFileStatuses: Seq[FileStatus] =
cachedAllInputFileSlices(partition)
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
// Filter in candidate files based on the Z-index lookup
val candidateFiles =
baseFileStatuses.filter(fileStatus =>
baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)))
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(partition.values, candidateFiles)
}
logInfo(s"Total files: ${totalFileSize}," +
s" Candidate files after data skipping : ${candidateFileSize} " +
s"skipping percent ${if (allFiles.length != 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
logInfo(s"Total base files: ${totalFileSize}; " +
s"candidate files after data skipping : ${candidateFileSize}; " +
s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
result
}
}

View File

@@ -54,7 +54,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}
import java.util
import java.util.Properties
import scala.collection.JavaConversions._
@@ -289,7 +288,7 @@ object HoodieSparkSqlWriter {
}
def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new util.ArrayList[String]()
val fieldsToRemove = new java.util.ArrayList[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
@@ -629,7 +628,7 @@ object HoodieSparkSqlWriter {
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
val commitSuccess =
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))),
common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds)
@@ -643,7 +642,7 @@ object HoodieSparkSqlWriter {
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant: common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
client.scheduleCompaction(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
@@ -653,7 +652,7 @@ object HoodieSparkSqlWriter {
val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
val clusteringInstant: common.util.Option[java.lang.String] =
if (asyncClusteringEnabled) {
client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
client.scheduleClustering(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}

View File

@@ -19,9 +19,11 @@ package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.hudi.index.zorder.ZOrderingIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -29,181 +31,230 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
object DataSkippingUtils {
object DataSkippingUtils extends Logging {
/**
* Translates provided {@link filterExpr} into corresponding filter-expression for Z-index index table
* to filter out candidate files that would hold records matching the original filter
*
* @param filterExpr original filter from query
* @param sourceFilterExpr original filter from query
* @param indexSchema index table schema
* @return filter for Z-index table
*/
def createZIndexLookupFilter(filterExpr: Expression, indexSchema: StructType): Expression = {
def rewriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = {
val stats = Set.apply(
UnresolvedAttribute(colName).name + "_minValue",
UnresolvedAttribute(colName).name + "_maxValue",
UnresolvedAttribute(colName).name + "_num_nulls"
)
if (stats.forall(stat => indexSchema.exists(_.name == stat))) {
conditionExpress
} else {
Literal.TrueLiteral
}
def createZIndexLookupFilter(sourceFilterExpr: Expression, indexSchema: StructType): Expression = {
// Try to transform original Source Table's filter expression into
// Column-Stats Index filter expression
tryComposeIndexFilterExpr(sourceFilterExpr, indexSchema) match {
case Some(e) => e
// NOTE: In case we can't transform source filter expression, we fallback
// to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning
case None => TrueLiteral
}
}
def refColExpr(colName: Seq[String], statisticValue: String): Expression =
col(UnresolvedAttribute(colName).name + statisticValue).expr
private def tryComposeIndexFilterExpr(sourceExpr: Expression, indexSchema: StructType): Option[Expression] = {
def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
def minValue(colName: Seq[String]) = refColExpr(colName, "_minValue")
def maxValue(colName: Seq[String]) = refColExpr(colName, "_maxValue")
def numNulls(colName: Seq[String]) = refColExpr(colName, "_num_nulls")
def colContainsValuesEqualToLiteral(colName: Seq[String], value: Literal) =
def colContainsValuesEqualToLiteral(colName: String, value: Literal): Expression =
// Only case when column C contains value V is when min(C) <= V <= max(c)
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
def colContainsValuesEqualToLiterals(colName: Seq[String], list: Seq[Literal]) =
list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or)
def colContainsOnlyValuesEqualToLiteral(colName: String, value: Literal) =
// Only case when column C contains _only_ value V is when min(C) = V AND max(c) = V
And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))
filterExpr match {
sourceExpr match {
// Filter "colA = b"
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
case EqualTo(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, value))
// Filter "b = colA"
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
case EqualTo(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, value))
// Filter "colA != b"
// Translates to "NOT(colA_minValue = b AND colA_maxValue = b)"
// NOTE: This is NOT an inversion of `colA = b`
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value)))
// Filter "b != colA"
// Translates to "NOT(colA_minValue = b AND colA_maxValue = b)"
// NOTE: This is NOT an inversion of `colA = b`
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value)))
// Filter "colA = null"
// Translates to "colA_num_nulls = null" for index lookup
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
val colName = getTargetColNameParts(equalNullSafe.left)
rewriteCondition(colName, EqualTo(numNulls(colName), equalNullSafe.right))
getTargetIndexedColName(equalNullSafe.left, indexSchema)
.map(colName => EqualTo(numNulls(colName), equalNullSafe.right))
// Filter "colA < b"
// Translates to "colA_minValue < b" for index lookup
case LessThan(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, LessThan(minValue(colName), value))
// Filter "b < colA"
// Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, GreaterThan(maxValue(colName), value))
// Filter "colA > b"
// Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, GreaterThan(maxValue(colName), value))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThan(minValue(colName), value))
// Filter "b > colA"
// Translates to "b > colA_minValue" for index lookup
case GreaterThan(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, LessThan(minValue(colName), value))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThan(minValue(colName), value))
// Filter "b < colA"
// Translates to "b < colA_maxValue" for index lookup
case LessThan(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(maxValue(colName), value))
// Filter "colA > b"
// Translates to "colA_maxValue > b" for index lookup
case GreaterThan(attribute: AttributeReference, value: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(maxValue(colName), value))
// Filter "colA <= b"
// Translates to "colA_minValue <= b" for index lookup
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, LessThanOrEqual(minValue(colName), value))
// Filter "b <= colA"
// Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
// Filter "colA >= b"
// Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThanOrEqual(minValue(colName), value))
// Filter "b >= colA"
// Translates to "b >= colA_minValue" for index lookup
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, LessThanOrEqual(minValue(colName), value))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => LessThanOrEqual(minValue(colName), value))
// Filter "b <= colA"
// Translates to "b <= colA_maxValue" for index lookup
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThanOrEqual(maxValue(colName), value))
// Filter "colA >= b"
// Translates to "colA_maxValue >= b" for index lookup
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThanOrEqual(maxValue(colName), right))
// Filter "colA is null"
// Translates to "colA_num_nulls > 0" for index lookup
case IsNull(attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, GreaterThan(numNulls(colName), Literal(0)))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => GreaterThan(numNulls(colName), Literal(0)))
// Filter "colA is not null"
// Translates to "colA_num_nulls = 0" for index lookup
case IsNotNull(attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, EqualTo(numNulls(colName), Literal(0)))
getTargetIndexedColName(attribute, indexSchema)
.map(colName => EqualTo(numNulls(colName), Literal(0)))
// Filter "colA in (a, b, ...)"
// Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR (colA_minValue <= b AND colA_maxValue >= b)" for index lookup
// NOTE: This is equivalent to "colA = a OR colA = b OR ..."
case In(attribute: AttributeReference, list: Seq[Literal]) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, colContainsValuesEqualToLiterals(colName, list))
// Filter "colA like xxx"
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or)
)
// Filter "colA not in (a, b, ...)"
// Translates to "NOT((colA_minValue = a AND colA_maxValue = a) OR (colA_minValue = b AND colA_maxValue = b))" for index lookup
// NOTE: This is NOT an inversion of `in (a, b, ...)` expr, this is equivalent to "colA != a AND colA != b AND ..."
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
Not(
list.map { lit => colContainsOnlyValuesEqualToLiteral(colName, lit) }.reduce(Or)
)
)
// Filter "colA like 'xxx%'"
// Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for index lookup
// NOTE: That this operator only matches string prefixes, and this is
// essentially equivalent to "colA = b" expression
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, v))
// Filter "colA not in (a, b, ...)"
// Translates to "(colA_minValue > a OR colA_maxValue < a) AND (colA_minValue > b OR colA_maxValue < b)" for index lookup
// NOTE: This is an inversion of `in (a, b, ...)` expr
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, Not(colContainsValuesEqualToLiterals(colName, list)))
// Filter "colA != b"
// Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup
// NOTE: This is an inversion of `colA = b` expr
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
// Filter "b != colA"
// Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup
// NOTE: This is an inversion of `colA != b` expr
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
// Filter "colA not like xxx"
// Translates to "!(colA_minValue <= xxx AND colA_maxValue >= xxx)" for index lookup
// NOTE: This is a inversion of "colA like xxx" assuming that colA is a string-based type
getTargetIndexedColName(attribute, indexSchema)
.map(colName => colContainsValuesEqualToLiteral(colName, v))
// Filter "colA not like 'xxx%'"
// Translates to "NOT(colA_minValue like 'xxx%' AND colA_maxValue like 'xxx%')" for index lookup
// NOTE: This is NOT an inversion of "colA like xxx"
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
val colName = getTargetColNameParts(attribute)
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
getTargetIndexedColName(attribute, indexSchema)
.map(colName =>
Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value)))
)
case or: Or =>
val resLeft = createZIndexLookupFilter(or.left, indexSchema)
val resRight = createZIndexLookupFilter(or.right, indexSchema)
Or(resLeft, resRight)
Option(Or(resLeft, resRight))
case and: And =>
val resLeft = createZIndexLookupFilter(and.left, indexSchema)
val resRight = createZIndexLookupFilter(and.right, indexSchema)
And(resLeft, resRight)
case expr: Expression =>
Literal.TrueLiteral
Option(And(resLeft, resRight))
//
// Pushing Logical NOT inside the AND/OR expressions
// NOTE: This is required to make sure we're properly handling negations in
// cases like {@code NOT(colA = 0)}, {@code NOT(colA in (a, b, ...)}
//
case Not(And(left: Expression, right: Expression)) =>
Option(createZIndexLookupFilter(Or(Not(left), Not(right)), indexSchema))
case Not(Or(left: Expression, right: Expression)) =>
Option(createZIndexLookupFilter(And(Not(left), Not(right)), indexSchema))
case _: Expression => None
}
}
/**
* Extracts name from a resolved expression referring to a nested or non-nested column.
*/
def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = {
private def checkColIsIndexed(colName: String, indexSchema: StructType): Boolean = {
Set.apply(
getMinColumnNameFor(colName),
getMaxColumnNameFor(colName),
getNumNullsColumnNameFor(colName)
)
.forall(stat => indexSchema.exists(_.name == stat))
}
private def getTargetIndexedColName(resolvedExpr: Expression, indexSchema: StructType): Option[String] = {
val colName = UnresolvedAttribute(getTargetColNameParts(resolvedExpr)).name
// Verify that the column is indexed
if (checkColIsIndexed(colName, indexSchema)) {
Option.apply(colName)
} else {
None
}
}
private def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = {
resolvedTargetCol match {
case attr: Attribute => Seq(attr.name)
case Alias(c, _) => getTargetColNameParts(c)
case GetStructField(c, _, Some(name)) => getTargetColNameParts(c) :+ name
case ex: ExtractValue =>
throw new AnalysisException(s"convert reference to name failed, Updating nested fields is only supported for StructType: ${ex}.")
case other =>
throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}")
}

View File

@@ -0,0 +1,8 @@
{"c1_maxValue":1000,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 993sdc","c2_minValue":" 1000sdc","c2_num_nulls":0,"c3_maxValue":999.348,"c3_minValue":5.102,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"}
{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":998,"c1_minValue":2,"c1_num_nulls":0,"c2_maxValue":" 998sdc","c2_minValue":" 104sdc","c2_num_nulls":0,"c3_maxValue":997.905,"c3_minValue":0.876,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"}
{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":994,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":997.496,"c3_minValue":7.742,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"}
{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 10sdc","c2_num_nulls":0,"c3_maxValue":993.940,"c3_minValue":4.598,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"}
{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}

View File

@@ -0,0 +1,4 @@
{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}
{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"}

View File

@@ -46,7 +46,6 @@ import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
import java.time.Instant
import java.util
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
@@ -147,7 +146,7 @@ class HoodieSparkSqlWriterSuite {
* @param inputList list of Row
* @return list of Seq
*/
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
/**

View File

@@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.index.zorder.ZOrderingIndexHelper
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType}
import org.apache.spark.sql.{Column, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import scala.collection.JavaConverters._
// NOTE: Only A, B columns are indexed
case class IndexRow(
file: String,
A_minValue: Long,
A_maxValue: Long,
A_num_nulls: Long,
B_minValue: String = null,
B_maxValue: String = null,
B_num_nulls: Long = -1
)
class TestDataSkippingUtils extends HoodieClientTestBase {
var spark: SparkSession = _
@BeforeEach
override def setUp(): Unit = {
initSparkContexts()
spark = sqlContext.sparkSession
}
val indexedCols = Seq("A", "B")
val sourceTableSchema =
StructType(
Seq(
StructField("A", LongType),
StructField("B", StringType),
StructField("C", VarcharType(32))
)
)
val indexSchema =
ZOrderingIndexHelper.composeIndexSchema(
sourceTableSchema.fields.toSeq
.filter(f => indexedCols.contains(f.name))
.asJava
)
@ParameterizedTest
@MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource"))
def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr: Expression = resolveFilterExpr(sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema)
val spark2 = spark
import spark2.implicits._
val indexDf = spark.createDataset(input)
val rows = indexDf.where(new Column(lookupFilter))
.select("file")
.collect()
.map(_.getString(0))
.toSeq
assertEquals(output, rows)
}
@ParameterizedTest
@MethodSource(Array("testStringsLookupFilterExpressionsSource"))
def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = {
val resolvedExpr = resolveFilterExpr(sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema)
val spark2 = spark
import spark2.implicits._
val indexDf = spark.createDataset(input)
val rows = indexDf.where(new Column(lookupFilter))
.select("file")
.collect()
.map(_.getString(0))
.toSeq
assertEquals(output, rows)
}
private def resolveFilterExpr(exprString: String, tableSchema: StructType): Expression = {
val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveFilterExpr(expr, tableSchema)
}
private def resolveFilterExpr(expr: Expression, tableSchema: StructType): Expression = {
val schemaFields = tableSchema.fields
val resolvedExpr = spark.sessionState.analyzer.ResolveReferences(
Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
)
.asInstanceOf[Filter].condition
checkForUnresolvedRefs(resolvedExpr)
}
def checkForUnresolvedRefs(resolvedExpr: Expression): Expression =
resolvedExpr match {
case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute")
case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e))
}
}
object TestDataSkippingUtils {
def testStringsLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
arguments(
col("B").startsWith("abc").expr,
Seq(
IndexRow("file_1", 0, 0, 0, "aba", "adf", 1), // may contain strings starting w/ "abc"
IndexRow("file_2", 0, 0, 0, "adf", "azy", 0),
IndexRow("file_3", 0, 0, 0, "aaa", "aba", 0)
),
Seq("file_1")),
arguments(
Not(col("B").startsWith("abc").expr),
Seq(
IndexRow("file_1", 0, 0, 0, "aba", "adf", 1), // may contain strings starting w/ "abc"
IndexRow("file_2", 0, 0, 0, "adf", "azy", 0),
IndexRow("file_3", 0, 0, 0, "aaa", "aba", 0),
IndexRow("file_4", 0, 0, 0, "abc123", "abc345", 0) // all strings start w/ "abc"
),
Seq("file_1", "file_2", "file_3"))
)
}
def testBaseLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
// TODO cases
// A = null
arguments(
"A = 0",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0)
),
Seq("file_2")),
arguments(
"0 = A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0)
),
Seq("file_2")),
arguments(
"A != 0",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", 0, 0, 0) // Contains only 0s
),
Seq("file_1", "file_2")),
arguments(
"0 != A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", 0, 0, 0) // Contains only 0s
),
Seq("file_1", "file_2")),
arguments(
"A < 0",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_2", "file_3")),
arguments(
"0 > A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_2", "file_3")),
arguments(
"A > 0",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
"0 < A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
"A <= -1",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_2", "file_3")),
arguments(
"-1 >= A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_2", "file_3")),
arguments(
"A >= 1",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
"1 <= A",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
"A is null",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 1)
),
Seq("file_2")),
arguments(
"A is not null",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 1)
),
Seq("file_1")),
arguments(
"A in (0, 1)",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0)
),
Seq("file_1", "file_2")),
arguments(
"A not in (0, 1)",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0), // only contains 0
IndexRow("file_5", 1, 1, 0) // only contains 1
),
Seq("file_1", "file_2", "file_3"))
)
}
def testAdvancedLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
arguments(
// Filter out all rows that contain either A = 0 OR A = 1
"A != 0 AND A != 1",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0), // only contains 0
IndexRow("file_5", 1, 1, 0) // only contains 1
),
Seq("file_1", "file_2", "file_3")),
arguments(
// This is an equivalent to the above expression
"NOT(A = 0 OR A = 1)",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0), // only contains 0
IndexRow("file_5", 1, 1, 0) // only contains 1
),
Seq("file_1", "file_2", "file_3")),
arguments(
// Filter out all rows that contain A = 0 AND B = 'abc'
"A != 0 OR B != 'abc'",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0, "abc", "abc", 0), // only contains A = 0, B = 'abc'
IndexRow("file_5", 0, 0, 0, "abc", "abc", 0) // only contains A = 0, B = 'abc'
),
Seq("file_1", "file_2", "file_3")),
arguments(
// This is an equivalent to the above expression
"NOT(A = 0 AND B = 'abc')",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0, "abc", "abc", 0), // only contains A = 0, B = 'abc'
IndexRow("file_5", 0, 0, 0, "abc", "abc", 0) // only contains A = 0, B = 'abc'
),
Seq("file_1", "file_2", "file_3")),
arguments(
// Queries contains expression involving non-indexed column C
"A = 0 AND B = 'abc' AND C = '...'",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0, "aaa", "xyz", 0) // might contain A = 0 AND B = 'abc'
),
Seq("file_4")),
arguments(
// Queries contains expression involving non-indexed column C
"A = 0 OR B = 'abc' OR C = '...'",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
IndexRow("file_3", -2, -1, 0),
IndexRow("file_4", 0, 0, 0, "aaa", "xyz", 0) // might contain B = 'abc'
),
Seq("file_1", "file_2", "file_3", "file_4"))
)
}
}

View File

@@ -59,7 +59,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
)
@BeforeEach override def setUp() {
@BeforeEach
override def setUp() {
setTableName("hoodie_test")
initPath()
initSparkContexts()

View File

@@ -19,20 +19,17 @@
package org.apache.hudi
import org.apache.avro.generic.GenericRecord
import java.io.File
import java.nio.file.Paths
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.avro.IncompatibleSchemaException
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import java.util
import java.io.File
import java.nio.file.Paths
import scala.collection.JavaConverters
class TestHoodieSparkUtils {
@@ -235,6 +232,6 @@ class TestHoodieSparkUtils {
spark.stop()
}
def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
}

View File

@@ -1,231 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.ZCurveOptimizeHelper
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.util.Random
@Tag("functional")
class TestTableLayoutOptimization extends HoodieClientTestBase {
var spark: SparkSession = _
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
@BeforeEach override def setUp() {
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
}
@AfterEach override def tearDown() = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
}
@ParameterizedTest
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testOptimizeWithClustering(tableType: String): Unit = {
val targetRecordsCount = 10000
// Bulk Insert Operation
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
writeDf.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
// option for clustering
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
val readDf =
spark.read
.format("hudi")
.load(basePath)
val readDfSkip =
spark.read
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi")
.load(basePath)
assertEquals(targetRecordsCount, readDf.count())
assertEquals(targetRecordsCount, readDfSkip.count())
readDf.createOrReplaceTempView("hudi_snapshot_raw")
readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
def select(tableName: String) =
spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
assertRowsMatch(
select("hudi_snapshot_raw"),
select("hudi_snapshot_skipping")
)
}
def assertRowsMatch(one: DataFrame, other: DataFrame) = {
val rows = one.count()
assert(rows == other.count() && one.intersect(other).count() == rows)
}
@Test
def testCollectMinMaxStatistics(): Unit = {
val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
try {
// test z-order sort for all primitive type, should not throw exception.
ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
ZCurveOptimizeHelper.createZIndexedDataFrameBySample(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
// do not support TimeStampType, so if we collect statistics for c4, should throw exception
val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8")
colDf.cache()
assertEquals(colDf.count(), 3)
assertEquals(colDf.take(1)(0).length, 22)
colDf.unpersist()
// try to save statistics
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1"))
// save again
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2"))
// test old index table clean
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3"))
assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
assertEquals(fs.exists(new Path(statisticPath, "3")), true)
// test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved.
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4"))
assertEquals(fs.exists(new Path(statisticPath, "5")), true)
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
if (fs.exists(statisticPath)) fs.delete(statisticPath)
}
}
// test collect min-max statistic info for DateType in the case of multithreading.
// parquet will give a wrong statistic result for DateType in the case of multithreading.
@Test
def testMultiThreadParquetFooterReadForDateType(): Unit = {
// create parquet file with DateType
val rdd = spark.sparkContext.parallelize(0 to 100, 1)
.map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")))
val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType")
val conf = spark.sparkContext.hadoopConfiguration
val cols = new java.util.ArrayList[String]
cols.add("id")
try {
df.repartition(3).write.mode("overwrite").save(testPath.toString)
val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x)
val realResult = new Array[(String, String)](3)
inputFiles.zipWithIndex.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}
// multi thread read with no lock
val resUseLock = new Array[(String, String)](3)
inputFiles.zipWithIndex.par.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}
// check resUseNoLock,
// We can't guarantee that there must be problems in the case of multithreading.
// In order to make ut pass smoothly, we will not check resUseNoLock.
// check resUseLock
// should pass assert
realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}")
}
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
}
}
def createComplexDataFrame(spark: SparkSession): DataFrame = {
val schema = new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
.add("c3", DecimalType(9,3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
val c1 = Integer.valueOf(item)
val c2 = s" ${item}sdc"
val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
val c4 = new Timestamp(System.currentTimeMillis())
val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
val c7 = Array(item).map(_.toByte)
val c8 = java.lang.Byte.valueOf("9")
RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
}
spark.createDataFrame(rdd, schema)
}
}

View File

@@ -0,0 +1,398 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.index.zorder.ZOrderingIndexHelper
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.util.Random
@Tag("functional")
class TestZOrderLayoutOptimization extends HoodieClientTestBase {
var spark: SparkSession = _
val sourceTableSchema =
new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
.add("c3", DecimalType(9,3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
@BeforeEach
override def setUp() {
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
}
@AfterEach
override def tearDown() = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
}
@ParameterizedTest
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testZOrderingLayoutClustering(tableType: String): Unit = {
val targetRecordsCount = 10000
// Bulk Insert Operation
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
writeDf.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
// option for clustering
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
val hudiMetaClient = HoodieTableMetaClient.builder
.setConf(hadoopConf)
.setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true)
.build
val lastCommit = hudiMetaClient.getActiveTimeline.getAllCommitsTimeline.lastInstant().get()
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastCommit.getAction)
assertEquals(HoodieInstant.State.COMPLETED, lastCommit.getState)
val readDf =
spark.read
.format("hudi")
.load(basePath)
val readDfSkip =
spark.read
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi")
.load(basePath)
assertEquals(targetRecordsCount, readDf.count())
assertEquals(targetRecordsCount, readDfSkip.count())
readDf.createOrReplaceTempView("hudi_snapshot_raw")
readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
def select(tableName: String) =
spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
assertRowsMatch(
select("hudi_snapshot_raw"),
select("hudi_snapshot_skipping")
)
}
@Test
@Disabled
def testZIndexTableComposition(): Unit = {
val inputDf =
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
spark.read
.schema(sourceTableSchema)
.parquet(
getClass.getClassLoader.getResource("index/zorder/input-table").toString
)
val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
// {@link TimestampType} is not supported, and will throw -- hence skipping "c4"
val newZIndexTableDf =
ZOrderingIndexHelper.buildZIndexTableFor(
inputDf.sparkSession,
inputDf.inputFiles.toSeq,
zorderedColsSchemaFields
)
val indexSchema =
ZOrderingIndexHelper.composeIndexSchema(
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
)
// Collect Z-index stats manually (reading individual Parquet files)
val manualZIndexTableDf =
buildZIndexTableManually(
getClass.getClassLoader.getResource("index/zorder/input-table").toString,
zorderedCols,
indexSchema
)
// NOTE: Z-index is built against stats collected w/in Parquet footers, which will be
// represented w/ corresponding Parquet schema (INT, INT64, INT96, etc).
//
// When stats are collected manually, produced Z-index table is inherently coerced into the
// schema of the original source Parquet base-file and therefore we have to similarly coerce newly
// built Z-index table (built off Parquet footers) into the canonical index schema (built off the
// original source file schema)
assertEquals(asJson(sort(manualZIndexTableDf)), asJson(sort(newZIndexTableDf)))
// Match against expected Z-index table
val expectedZIndexTableDf =
spark.read
.schema(indexSchema)
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(newZIndexTableDf)))
}
@Test
@Disabled
def testZIndexTableMerge(): Unit = {
val testZIndexPath = new Path(basePath, "zindex")
val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
val indexSchema =
ZOrderingIndexHelper.composeIndexSchema(
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
)
//
// Bootstrap Z-index table
//
val firstCommitInstance = "0"
val firstInputDf =
spark.read.parquet(
getClass.getClassLoader.getResource("index/zorder/input-table").toString
)
ZOrderingIndexHelper.updateZIndexFor(
firstInputDf.sparkSession,
sourceTableSchema,
firstInputDf.inputFiles.toSeq,
zorderedCols.toSeq,
testZIndexPath.toString,
firstCommitInstance,
Seq()
)
// NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
// to reliably retrieve it
val initialZIndexTable =
spark.read
.parquet(new Path(testZIndexPath, firstCommitInstance).toString)
val expectedInitialZIndexTableDf =
spark.read
.schema(indexSchema)
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(initialZIndexTable)))
val secondCommitInstance = "1"
val secondInputDf =
spark.read
.schema(sourceTableSchema)
.parquet(
getClass.getClassLoader.getResource("index/zorder/another-input-table").toString
)
//
// Update Z-index table
//
ZOrderingIndexHelper.updateZIndexFor(
secondInputDf.sparkSession,
sourceTableSchema,
secondInputDf.inputFiles.toSeq,
zorderedCols.toSeq,
testZIndexPath.toString,
secondCommitInstance,
Seq(firstCommitInstance)
)
// NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
// to reliably retrieve it
val mergedZIndexTable =
spark.read
.parquet(new Path(testZIndexPath, secondCommitInstance).toString)
val expectedMergedZIndexTableDf =
spark.read
.schema(indexSchema)
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString)
assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(mergedZIndexTable)))
}
@Test
@Disabled
def testZIndexTablesGarbageCollection(): Unit = {
val testZIndexPath = new Path(System.getProperty("java.io.tmpdir"), "zindex")
val fs = testZIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val inputDf =
spark.read.parquet(
getClass.getClassLoader.getResource("index/zorder/input-table").toString
)
// Try to save statistics
ZOrderingIndexHelper.updateZIndexFor(
inputDf.sparkSession,
sourceTableSchema,
inputDf.inputFiles.toSeq,
Seq("c1","c2","c3","c5","c6","c7","c8"),
testZIndexPath.toString,
"2",
Seq("0", "1")
)
// Save again
ZOrderingIndexHelper.updateZIndexFor(
inputDf.sparkSession,
sourceTableSchema,
inputDf.inputFiles.toSeq,
Seq("c1","c2","c3","c5","c6","c7","c8"),
testZIndexPath.toString,
"3",
Seq("0", "1", "2")
)
// Test old index table being cleaned up
ZOrderingIndexHelper.updateZIndexFor(
inputDf.sparkSession,
sourceTableSchema,
inputDf.inputFiles.toSeq,
Seq("c1","c2","c3","c5","c6","c7","c8"),
testZIndexPath.toString,
"4",
Seq("0", "1", "3")
)
assertEquals(!fs.exists(new Path(testZIndexPath, "2")), true)
assertEquals(!fs.exists(new Path(testZIndexPath, "3")), true)
assertEquals(fs.exists(new Path(testZIndexPath, "4")), true)
}
private def buildZIndexTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = {
val files = {
val it = fs.listFiles(new Path(tablePath), true)
var seq = Seq[LocatedFileStatus]()
while (it.hasNext) {
seq = seq :+ it.next()
}
seq
}
spark.createDataFrame(
files.flatMap(file => {
val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString)
val exprs: Seq[String] =
s"'${typedLit(file.getPath.getName)}' AS file" +:
df.columns
.filter(col => zorderedCols.contains(col))
.flatMap(col => {
val minColName = s"${col}_minValue"
val maxColName = s"${col}_maxValue"
Seq(
s"min($col) AS $minColName",
s"max($col) AS $maxColName",
s"sum(cast(isnull($col) AS long)) AS ${col}_num_nulls"
)
})
df.selectExpr(exprs: _*)
.collect()
}),
indexSchema
)
}
private def asJson(df: DataFrame) =
df.toJSON
.select("value")
.collect()
.toSeq
.map(_.getString(0))
.mkString("\n")
private def assertRowsMatch(one: DataFrame, other: DataFrame) = {
val rows = one.count()
assert(rows == other.count() && one.intersect(other).count() == rows)
}
private def sort(df: DataFrame): DataFrame = {
// Since upon parsing JSON, Spark re-order columns in lexicographical order
// of their names, we have to shuffle new Z-index table columns order to match
// Rows are sorted by filename as well to avoid
val sortedCols = df.columns.sorted
df.select(sortedCols.head, sortedCols.tail: _*)
.sort("file")
}
def createComplexDataFrame(spark: SparkSession): DataFrame = {
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
val c1 = Integer.valueOf(item)
val c2 = s" ${item}sdc"
val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
val c4 = new Timestamp(System.currentTimeMillis())
val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
val c7 = Array(item).map(_.toByte)
val c8 = java.lang.Byte.valueOf("9")
RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
}
spark.createDataFrame(rdd, sourceTableSchema)
}
}

View File

@@ -275,7 +275,9 @@
<module name="EmptyStatement" />
<!-- Checks for Java Docs. -->
<module name="JavadocStyle"/>
<module name="JavadocStyle">
<property name="checkFirstSentence" value="false"/>
</module>
<module name="JavadocType">
<property name="severity" value="info"/>