[HUDI-2788] Fixing issues w/ Z-order Layout Optimization (#4026)
* Simplyfying, tidying up * Fixed packaging for `TestOptimizeTable` * Cleaned up `HoodiFileIndex` file filtering seq; Removed optimization manually reading Parquet table circumventing Spark * Refactored `DataSkippingUtils`: - Fixed checks to validate all statistics cols are present - Fixed some predicates being constructed incorrectly - Rewrote comments for easier comprehension, added more notes - Tidying up * Tidying up tests * `lint` * Fixing compilation * `TestOptimizeTable` > `TestTableLayoutOptimization`; Added assertions to test data skipping paths * Fixed tests to properly hit data-skipping path * Fixed pruned files candidates lookup seq to conservatively included all non-indexed files * Added java-doc * Fixed compilation
This commit is contained in:
@@ -39,7 +39,7 @@ import org.apache.spark.sql.Dataset;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition.
|
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
|
||||||
* support z-curve optimization, hilbert will come soon.
|
* support z-curve optimization, hilbert will come soon.
|
||||||
* @param <T> HoodieRecordPayload type
|
* @param <T> HoodieRecordPayload type
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -189,11 +189,10 @@ public class ZCurveOptimizeHelper {
|
|||||||
|
|
||||||
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
|
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
|
||||||
int numParallelism = (scanFiles.size() / 3 + 1);
|
int numParallelism = (scanFiles.size() / 3 + 1);
|
||||||
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos = new ArrayList<>();
|
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
|
||||||
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
|
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
|
||||||
try {
|
try {
|
||||||
String description = "Listing parquet column statistics";
|
jsc.setJobDescription("Listing parquet column statistics");
|
||||||
jsc.setJobDescription(description);
|
|
||||||
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
|
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
|
||||||
Configuration conf = serializableConfiguration.value();
|
Configuration conf = serializableConfiguration.value();
|
||||||
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
||||||
@@ -209,7 +208,7 @@ public class ZCurveOptimizeHelper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
|
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
|
||||||
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(fileToStatsListMap.values().stream().collect(Collectors.toList()), 1).map(f -> {
|
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
|
||||||
int colSize = f.size();
|
int colSize = f.size();
|
||||||
if (colSize == 0) {
|
if (colSize == 0) {
|
||||||
return null;
|
return null;
|
||||||
@@ -299,7 +298,14 @@ public class ZCurveOptimizeHelper {
|
|||||||
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
|
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
|
||||||
// try to find last validate index table from index path
|
// try to find last validate index table from index path
|
||||||
try {
|
try {
|
||||||
if (fs.exists(new Path(indexPath))) {
|
// If there's currently no index, create one
|
||||||
|
if (!fs.exists(new Path(indexPath))) {
|
||||||
|
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, clean up all indexes but the most recent one
|
||||||
|
|
||||||
List<String> allIndexTables = Arrays
|
List<String> allIndexTables = Arrays
|
||||||
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
|
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
|
||||||
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
|
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
|
||||||
@@ -343,9 +349,6 @@ public class ZCurveOptimizeHelper {
|
|||||||
} else {
|
} else {
|
||||||
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
|
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -298,8 +298,8 @@ object HoodieSparkUtils extends SparkAdapterSupport {
|
|||||||
*/
|
*/
|
||||||
def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = {
|
def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = {
|
||||||
var selectsql = ""
|
var selectsql = ""
|
||||||
for (i <- (0 to cols.size-1)) {
|
for (i <- cols.indices) {
|
||||||
selectsql = selectsql + s" if (${leftTable}.${cols(0)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
|
selectsql = selectsql + s" if (${leftTable}.${cols(i)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
|
||||||
}
|
}
|
||||||
"select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}"
|
"select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -160,41 +160,92 @@ case class HoodieFileIndex(
|
|||||||
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
|
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
|
||||||
}
|
}
|
||||||
|
|
||||||
private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]): Set[String] = {
|
/**
|
||||||
var allFiles: Set[String] = Set.empty
|
* Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters}
|
||||||
var candidateFiles: Set[String] = Set.empty
|
* conditions, by leveraging custom Z-order index (Z-index) bearing "min", "max", "num_nulls" statistic
|
||||||
|
* for all clustered columns
|
||||||
|
*
|
||||||
|
* NOTE: This method has to return complete set of candidate files, since only provided candidates will
|
||||||
|
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
|
||||||
|
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
|
||||||
|
*
|
||||||
|
* @param dataFilters list of original data filters passed down from querying engine
|
||||||
|
* @return list of pruned (data-skipped) candidate base-files' names
|
||||||
|
*/
|
||||||
|
private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]): Option[Set[String]] = {
|
||||||
val indexPath = metaClient.getZindexPath
|
val indexPath = metaClient.getZindexPath
|
||||||
val fs = metaClient.getFs
|
val fs = metaClient.getFs
|
||||||
if (fs.exists(new Path(indexPath)) && dataFilters.nonEmpty) {
|
|
||||||
// try to load latest index table from index path
|
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || dataFilters.isEmpty) {
|
||||||
val candidateIndexTables = fs.listStatus(new Path(indexPath)).filter(_.isDirectory)
|
// scalastyle:off return
|
||||||
.map(_.getPath.getName).filter(f => completedCommits.contains(f)).sortBy(x => x)
|
return Option.empty
|
||||||
if (candidateIndexTables.nonEmpty) {
|
// scalastyle:on return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all index tables present in `.zindex` folder
|
||||||
|
val candidateIndexTables =
|
||||||
|
fs.listStatus(new Path(indexPath))
|
||||||
|
.filter(_.isDirectory)
|
||||||
|
.map(_.getPath.getName)
|
||||||
|
.filter(f => completedCommits.contains(f))
|
||||||
|
.sortBy(x => x)
|
||||||
|
|
||||||
|
if (candidateIndexTables.isEmpty) {
|
||||||
|
// scalastyle:off return
|
||||||
|
return Option.empty
|
||||||
|
// scalastyle:on return
|
||||||
|
}
|
||||||
|
|
||||||
val dataFrameOpt = try {
|
val dataFrameOpt = try {
|
||||||
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
|
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
|
||||||
} catch {
|
} catch {
|
||||||
case _: Throwable =>
|
case t: Throwable =>
|
||||||
logError("missing index skip data-skipping")
|
logError("Failed to read Z-index; skipping", t)
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dataFrameOpt.isDefined) {
|
dataFrameOpt.map(df => {
|
||||||
val indexSchema = dataFrameOpt.get.schema
|
val indexSchema = df.schema
|
||||||
val indexFiles = DataSkippingUtils.getIndexFiles(spark.sparkContext.hadoopConfiguration, new Path(indexPath, candidateIndexTables.last).toString)
|
val indexFilter =
|
||||||
val indexFilter = dataFilters.map(DataSkippingUtils.createZindexFilter(_, indexSchema)).reduce(And)
|
dataFilters.map(DataSkippingUtils.createZIndexLookupFilter(_, indexSchema))
|
||||||
logInfo(s"index filter condition: $indexFilter")
|
.reduce(And)
|
||||||
dataFrameOpt.get.persist()
|
|
||||||
if (indexFiles.size <= 4) {
|
logInfo(s"Index filter condition: $indexFilter")
|
||||||
allFiles = DataSkippingUtils.readParquetFile(spark, indexFiles)
|
|
||||||
} else {
|
df.persist()
|
||||||
allFiles = dataFrameOpt.get.select("file").collect().map(_.getString(0)).toSet
|
|
||||||
|
val allIndexedFileNames =
|
||||||
|
df.select("file")
|
||||||
|
.collect()
|
||||||
|
.map(_.getString(0))
|
||||||
|
.toSet
|
||||||
|
|
||||||
|
val prunedCandidateFileNames =
|
||||||
|
df.filter(new Column(indexFilter))
|
||||||
|
.select("file")
|
||||||
|
.collect()
|
||||||
|
.map(_.getString(0))
|
||||||
|
.toSet
|
||||||
|
|
||||||
|
df.unpersist()
|
||||||
|
|
||||||
|
// NOTE: Z-index isn't guaranteed to have complete set of statistics for every
|
||||||
|
// base-file: since it's bound to clustering, which could occur asynchronously
|
||||||
|
// at arbitrary point in time, and is not likely to touching all of the base files.
|
||||||
|
//
|
||||||
|
// To close that gap, we manually compute the difference b/w all indexed (Z-index)
|
||||||
|
// files and all outstanding base-files, and make sure that all base files not
|
||||||
|
// represented w/in Z-index are included in the output of this method
|
||||||
|
val notIndexedFileNames =
|
||||||
|
lookupFileNamesMissingFromIndex(allIndexedFileNames)
|
||||||
|
|
||||||
|
prunedCandidateFileNames ++ notIndexedFileNames
|
||||||
|
})
|
||||||
}
|
}
|
||||||
candidateFiles = dataFrameOpt.get.filter(new Column(indexFilter)).select("file").collect().map(_.getString(0)).toSet
|
|
||||||
dataFrameOpt.get.unpersist()
|
private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
|
||||||
}
|
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
|
||||||
}
|
allBaseFileNames -- allIndexedFileNames
|
||||||
}
|
|
||||||
allFiles -- candidateFiles
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -206,18 +257,22 @@ case class HoodieFileIndex(
|
|||||||
*/
|
*/
|
||||||
override def listFiles(partitionFilters: Seq[Expression],
|
override def listFiles(partitionFilters: Seq[Expression],
|
||||||
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
||||||
// try to load filterFiles from index
|
// Look up candidate files names in the Z-index, if all of the following conditions are true
|
||||||
val filterFiles: Set[String] = if (enableDataSkipping()) {
|
// - Data-skipping is enabled
|
||||||
filterFilesByDataSkippingIndex(dataFilters)
|
// - Z-index is present
|
||||||
} else {
|
// - List of predicates (filters) is present
|
||||||
Set.empty
|
val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesNamesInZIndex(dataFilters)
|
||||||
}
|
|
||||||
|
logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")
|
||||||
|
|
||||||
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
|
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
|
||||||
val candidateFiles = if (!filterFiles.isEmpty) {
|
// Filter in candidate files based on the Z-index lookup
|
||||||
allFiles.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
|
val candidateFiles =
|
||||||
} else {
|
allFiles.filter(fileStatus =>
|
||||||
allFiles
|
// NOTE: This predicate is true when {@code Option} is empty
|
||||||
}
|
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
|
||||||
|
)
|
||||||
|
|
||||||
logInfo(s"Total files : ${allFiles.size}," +
|
logInfo(s"Total files : ${allFiles.size}," +
|
||||||
s" candidate files after data skipping: ${candidateFiles.size} " +
|
s" candidate files after data skipping: ${candidateFiles.size} " +
|
||||||
s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
|
s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
|
||||||
@@ -236,11 +291,13 @@ case class HoodieFileIndex(
|
|||||||
null
|
null
|
||||||
}
|
}
|
||||||
}).filterNot(_ == null)
|
}).filterNot(_ == null)
|
||||||
val candidateFiles = if (!filterFiles.isEmpty) {
|
|
||||||
baseFileStatuses.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
|
// Filter in candidate files based on the Z-index lookup
|
||||||
} else {
|
val candidateFiles =
|
||||||
baseFileStatuses
|
baseFileStatuses.filter(fileStatus =>
|
||||||
}
|
// NOTE: This predicate is true when {@code Option} is empty
|
||||||
|
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)))
|
||||||
|
|
||||||
totalFileSize += baseFileStatuses.size
|
totalFileSize += baseFileStatuses.size
|
||||||
candidateFileSize += candidateFiles.size
|
candidateFileSize += candidateFiles.size
|
||||||
PartitionDirectory(partition.values, candidateFiles)
|
PartitionDirectory(partition.values, candidateFiles)
|
||||||
|
|||||||
@@ -36,120 +36,153 @@ import scala.collection.JavaConverters._
|
|||||||
object DataSkippingUtils {
|
object DataSkippingUtils {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create z_index filter and push those filters to index table to filter all candidate scan files.
|
* Translates provided {@link filterExpr} into corresponding filter-expression for Z-index index table
|
||||||
* @param condition origin filter from query.
|
* to filter out candidate files that would hold records matching the original filter
|
||||||
* @param indexSchema schema from index table.
|
*
|
||||||
* @return filters for index table.
|
* @param filterExpr original filter from query
|
||||||
|
* @param indexSchema index table schema
|
||||||
|
* @return filter for Z-index table
|
||||||
*/
|
*/
|
||||||
def createZindexFilter(condition: Expression, indexSchema: StructType): Expression = {
|
def createZIndexLookupFilter(filterExpr: Expression, indexSchema: StructType): Expression = {
|
||||||
def buildExpressionInternal(colName: Seq[String], statisticValue: String): Expression = {
|
|
||||||
val appendColName = UnresolvedAttribute(colName).name + statisticValue
|
|
||||||
col(appendColName).expr
|
|
||||||
}
|
|
||||||
|
|
||||||
def reWriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = {
|
def rewriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = {
|
||||||
val appendColName = UnresolvedAttribute(colName).name + "_minValue"
|
val stats = Set.apply(
|
||||||
if (indexSchema.exists(p => p.name == appendColName)) {
|
UnresolvedAttribute(colName).name + "_minValue",
|
||||||
|
UnresolvedAttribute(colName).name + "_maxValue",
|
||||||
|
UnresolvedAttribute(colName).name + "_num_nulls"
|
||||||
|
)
|
||||||
|
|
||||||
|
if (stats.forall(stat => indexSchema.exists(_.name == stat))) {
|
||||||
conditionExpress
|
conditionExpress
|
||||||
} else {
|
} else {
|
||||||
Literal.TrueLiteral
|
Literal.TrueLiteral
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_minValue")
|
def refColExpr(colName: Seq[String], statisticValue: String): Expression =
|
||||||
val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_maxValue")
|
col(UnresolvedAttribute(colName).name + statisticValue).expr
|
||||||
val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, "_num_nulls")
|
|
||||||
|
|
||||||
condition match {
|
def minValue(colName: Seq[String]) = refColExpr(colName, "_minValue")
|
||||||
// query filter "colA = b" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table
|
def maxValue(colName: Seq[String]) = refColExpr(colName, "_maxValue")
|
||||||
|
def numNulls(colName: Seq[String]) = refColExpr(colName, "_num_nulls")
|
||||||
|
|
||||||
|
def colContainsValuesEqualToLiteral(colName: Seq[String], value: Literal) =
|
||||||
|
And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
|
||||||
|
|
||||||
|
def colContainsValuesEqualToLiterals(colName: Seq[String], list: Seq[Literal]) =
|
||||||
|
list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or)
|
||||||
|
|
||||||
|
filterExpr match {
|
||||||
|
// Filter "colA = b"
|
||||||
|
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
|
||||||
case EqualTo(attribute: AttributeReference, value: Literal) =>
|
case EqualTo(attribute: AttributeReference, value: Literal) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)))
|
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value))
|
||||||
// query filter "b = colA" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table
|
// Filter "b = colA"
|
||||||
|
// Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup
|
||||||
case EqualTo(value: Literal, attribute: AttributeReference) =>
|
case EqualTo(value: Literal, attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)))
|
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value))
|
||||||
// query filter "colA = null" convert it to "colA_num_nulls = null" for index table
|
// Filter "colA = null"
|
||||||
|
// Translates to "colA_num_nulls = null" for index lookup
|
||||||
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
|
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
|
||||||
val colName = getTargetColNameParts(equalNullSafe.left)
|
val colName = getTargetColNameParts(equalNullSafe.left)
|
||||||
reWriteCondition(colName, EqualTo(num_nulls(colName), equalNullSafe.right))
|
rewriteCondition(colName, EqualTo(numNulls(colName), equalNullSafe.right))
|
||||||
// query filter "colA < b" convert it to "colA_minValue < b" for index table
|
// Filter "colA < b"
|
||||||
|
// Translates to "colA_minValue < b" for index lookup
|
||||||
case LessThan(attribute: AttributeReference, value: Literal) =>
|
case LessThan(attribute: AttributeReference, value: Literal) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName,LessThan(minValue(colName), value))
|
rewriteCondition(colName, LessThan(minValue(colName), value))
|
||||||
// query filter "b < colA" convert it to "colA_maxValue > b" for index table
|
// Filter "b < colA"
|
||||||
|
// Translates to "b < colA_maxValue" for index lookup
|
||||||
case LessThan(value: Literal, attribute: AttributeReference) =>
|
case LessThan(value: Literal, attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, GreaterThan(maxValue(colName), value))
|
rewriteCondition(colName, GreaterThan(maxValue(colName), value))
|
||||||
// query filter "colA > b" convert it to "colA_maxValue > b" for index table
|
// Filter "colA > b"
|
||||||
|
// Translates to "colA_maxValue > b" for index lookup
|
||||||
case GreaterThan(attribute: AttributeReference, value: Literal) =>
|
case GreaterThan(attribute: AttributeReference, value: Literal) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, GreaterThan(maxValue(colName), value))
|
rewriteCondition(colName, GreaterThan(maxValue(colName), value))
|
||||||
// query filter "b > colA" convert it to "colA_minValue < b" for index table
|
// Filter "b > colA"
|
||||||
|
// Translates to "b > colA_minValue" for index lookup
|
||||||
case GreaterThan(value: Literal, attribute: AttributeReference) =>
|
case GreaterThan(value: Literal, attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, LessThan(minValue(colName), value))
|
rewriteCondition(colName, LessThan(minValue(colName), value))
|
||||||
// query filter "colA <= b" convert it to "colA_minValue <= b" for index table
|
// Filter "colA <= b"
|
||||||
|
// Translates to "colA_minValue <= b" for index lookup
|
||||||
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
|
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
|
rewriteCondition(colName, LessThanOrEqual(minValue(colName), value))
|
||||||
// query filter "b <= colA" convert it to "colA_maxValue >= b" for index table
|
// Filter "b <= colA"
|
||||||
|
// Translates to "b <= colA_maxValue" for index lookup
|
||||||
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
|
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
|
rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
|
||||||
// query filter "colA >= b" convert it to "colA_maxValue >= b" for index table
|
// Filter "colA >= b"
|
||||||
|
// Translates to "colA_maxValue >= b" for index lookup
|
||||||
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
|
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right))
|
rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right))
|
||||||
// query filter "b >= colA" convert it to "colA_minValue <= b" for index table
|
// Filter "b >= colA"
|
||||||
|
// Translates to "b >= colA_minValue" for index lookup
|
||||||
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
|
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
|
rewriteCondition(colName, LessThanOrEqual(minValue(colName), value))
|
||||||
// query filter "colA is null" convert it to "colA_num_nulls > 0" for index table
|
// Filter "colA is null"
|
||||||
|
// Translates to "colA_num_nulls > 0" for index lookup
|
||||||
case IsNull(attribute: AttributeReference) =>
|
case IsNull(attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
|
rewriteCondition(colName, GreaterThan(numNulls(colName), Literal(0)))
|
||||||
// query filter "colA is not null" convert it to "colA_num_nulls = 0" for index table
|
// Filter "colA is not null"
|
||||||
|
// Translates to "colA_num_nulls = 0" for index lookup
|
||||||
case IsNotNull(attribute: AttributeReference) =>
|
case IsNotNull(attribute: AttributeReference) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
|
rewriteCondition(colName, EqualTo(numNulls(colName), Literal(0)))
|
||||||
// query filter "colA in (a,b)" convert it to " (colA_minValue <= a and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for index table
|
// Filter "colA in (a, b, ...)"
|
||||||
|
// Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR (colA_minValue <= b AND colA_maxValue >= b)" for index lookup
|
||||||
case In(attribute: AttributeReference, list: Seq[Literal]) =>
|
case In(attribute: AttributeReference, list: Seq[Literal]) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, list.map { lit =>
|
rewriteCondition(colName, colContainsValuesEqualToLiterals(colName, list))
|
||||||
And(LessThanOrEqual(minValue(colName), lit), GreaterThanOrEqual(maxValue(colName), lit))
|
// Filter "colA like xxx"
|
||||||
}.reduce(Or))
|
// Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for index lookup
|
||||||
// query filter "colA like xxx" convert it to " (colA_minValue <= xxx and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with xxx) " for index table
|
// NOTE: That this operator only matches string prefixes, and this is
|
||||||
|
// essentially equivalent to "colA = b" expression
|
||||||
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
|
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), v), GreaterThanOrEqual(maxValue(colName), v)) ,
|
rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, v))
|
||||||
Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), v))))
|
// Filter "colA not in (a, b, ...)"
|
||||||
// query filter "colA not in (a, b)" convert it to " (not( colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and colA_maxValue = b)) " for index table
|
// Translates to "(colA_minValue > a OR colA_maxValue < a) AND (colA_minValue > b OR colA_maxValue < b)" for index lookup
|
||||||
|
// NOTE: This is an inversion of `in (a, b, ...)` expr
|
||||||
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
|
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, list.map { lit =>
|
rewriteCondition(colName, Not(colContainsValuesEqualToLiterals(colName, list)))
|
||||||
Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), lit)))
|
// Filter "colA != b"
|
||||||
}.reduce(And))
|
// Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup
|
||||||
// query filter "colA != b" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table
|
// NOTE: This is an inversion of `colA = b` expr
|
||||||
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
|
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))))
|
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
|
||||||
// query filter "b != colA" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table
|
// Filter "b != colA"
|
||||||
|
// Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup
|
||||||
|
// NOTE: This is an inversion of `colA != b` expr
|
||||||
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
|
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))))
|
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
|
||||||
// query filter "colA not like xxxx" convert it to "not ( colA_minValue startWith xxx and colA_maxValue startWith xxx)" for index table
|
// Filter "colA not like xxx"
|
||||||
|
// Translates to "!(colA_minValue <= xxx AND colA_maxValue >= xxx)" for index lookup
|
||||||
|
// NOTE: This is a inversion of "colA like xxx" assuming that colA is a string-based type
|
||||||
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
|
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
|
||||||
val colName = getTargetColNameParts(attribute)
|
val colName = getTargetColNameParts(attribute)
|
||||||
reWriteCondition(colName, Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value))))
|
rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value)))
|
||||||
|
|
||||||
case or: Or =>
|
case or: Or =>
|
||||||
val resLeft = createZindexFilter(or.left, indexSchema)
|
val resLeft = createZIndexLookupFilter(or.left, indexSchema)
|
||||||
val resRight = createZindexFilter(or.right, indexSchema)
|
val resRight = createZIndexLookupFilter(or.right, indexSchema)
|
||||||
Or(resLeft, resRight)
|
Or(resLeft, resRight)
|
||||||
|
|
||||||
case and: And =>
|
case and: And =>
|
||||||
val resLeft = createZindexFilter(and.left, indexSchema)
|
val resLeft = createZIndexLookupFilter(and.left, indexSchema)
|
||||||
val resRight = createZindexFilter(and.right, indexSchema)
|
val resRight = createZIndexLookupFilter(and.right, indexSchema)
|
||||||
And(resLeft, resRight)
|
And(resLeft, resRight)
|
||||||
|
|
||||||
case expr: Expression =>
|
case expr: Expression =>
|
||||||
|
|||||||
@@ -18,28 +18,30 @@
|
|||||||
|
|
||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
import java.sql.{Date, Timestamp}
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat
|
import org.apache.hudi.common.model.HoodieFileFormat
|
||||||
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
|
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||||
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
|
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
|
||||||
|
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||||
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
|
||||||
import org.apache.spark.ZCurveOptimizeHelper
|
import org.apache.spark.ZCurveOptimizeHelper
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.ValueSource
|
import org.junit.jupiter.params.provider.ValueSource
|
||||||
|
|
||||||
|
import java.sql.{Date, Timestamp}
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
class TestOptimizeTable extends HoodieClientTestBase {
|
@Tag("functional")
|
||||||
var spark: SparkSession = null
|
class TestTableLayoutOptimization extends HoodieClientTestBase {
|
||||||
|
var spark: SparkSession = _
|
||||||
|
|
||||||
val commonOpts = Map(
|
val commonOpts = Map(
|
||||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||||
@@ -67,11 +69,13 @@ class TestOptimizeTable extends HoodieClientTestBase {
|
|||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
|
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
|
||||||
def testOptimizewithClustering(tableType: String): Unit = {
|
def testOptimizeWithClustering(tableType: String): Unit = {
|
||||||
|
val targetRecordsCount = 10000
|
||||||
// Bulk Insert Operation
|
// Bulk Insert Operation
|
||||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
|
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
|
||||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
||||||
inputDF1.write.format("org.apache.hudi")
|
|
||||||
|
writeDf.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(commonOpts)
|
||||||
.option("hoodie.compact.inline", "false")
|
.option("hoodie.compact.inline", "false")
|
||||||
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||||
@@ -83,30 +87,41 @@ class TestOptimizeTable extends HoodieClientTestBase {
|
|||||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
|
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
|
||||||
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
|
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
|
||||||
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
|
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
|
||||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 *1024 * 1024L))
|
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
|
||||||
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
|
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
|
||||||
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
|
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
assertEquals(1000, spark.read.format("hudi").load(basePath).count())
|
val readDf =
|
||||||
// use unsorted col as filter.
|
spark.read
|
||||||
assertEquals(spark.read
|
.format("hudi")
|
||||||
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count(),
|
.load(basePath)
|
||||||
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
|
|
||||||
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count())
|
val readDfSkip =
|
||||||
// use sorted col as filter.
|
spark.read
|
||||||
assertEquals(spark.read.format("hudi").load(basePath)
|
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
|
||||||
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count(),
|
.format("hudi")
|
||||||
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
|
.load(basePath)
|
||||||
.format("hudi").load(basePath)
|
|
||||||
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count())
|
assertEquals(targetRecordsCount, readDf.count())
|
||||||
// use sorted cols and unsorted cols as filter
|
assertEquals(targetRecordsCount, readDfSkip.count())
|
||||||
assertEquals(spark.read.format("hudi").load(basePath)
|
|
||||||
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count(),
|
readDf.createOrReplaceTempView("hudi_snapshot_raw")
|
||||||
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
|
readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
|
||||||
.format("hudi").load(basePath)
|
|
||||||
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count())
|
def select(tableName: String) =
|
||||||
|
spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
|
||||||
|
|
||||||
|
assertRowsMatch(
|
||||||
|
select("hudi_snapshot_raw"),
|
||||||
|
select("hudi_snapshot_skipping")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def assertRowsMatch(one: DataFrame, other: DataFrame) = {
|
||||||
|
val rows = one.count()
|
||||||
|
assert(rows == other.count() && one.intersect(other).count() == rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
Reference in New Issue
Block a user