1
0

[HUDI-2778] Optimize statistics collection related codes and add some docs for z-order add fix some bugs (#4013)

* [HUDI-2778] Optimize statistics collection related codes and add more docs for z-order.

* add test code for multi-thread parquet footer read
This commit is contained in:
xiarixiaoyao
2021-11-23 13:46:02 +08:00
committed by GitHub
parent c88c2af8bf
commit 9de9951348
6 changed files with 175 additions and 39 deletions

View File

@@ -142,6 +142,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
/**
* Using space-filling curves to optimize the layout of table to boost query performance.
* The table data which sorted by space-filling curve has better aggregation;
* combine with min-max filtering, it can achieve good performance improvement.
*
* Notice:
* when we use this feature, we need specify the sort columns.
* The more columns involved in sorting, the worse the aggregation, and the smaller the query performance improvement.
* Choose the filter columns which commonly used in query sql as sort columns.
* It is recommend that 2 ~ 4 columns participate in sorting.
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable")
.defaultValue(false)

View File

@@ -230,18 +230,14 @@ public class ZCurveOptimizeHelper {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
String minString = new String(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
String maxString = new String(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
rows.add(minString);
rows.add(maxString);
rows.add(currentColRangeMetaData.getMinValueAsString());
rows.add(currentColRangeMetaData.getMaxValueAsString());
} else if (colType instanceof DecimalType) {
Double minDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMinValue().toString())));
Double maxDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMaxValue().toString())));
rows.add(BigDecimal.valueOf(minDecimal));
rows.add(BigDecimal.valueOf(maxDecimal));
rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMinValue())));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMaxValue())));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
@@ -344,6 +340,8 @@ public class ZCurveOptimizeHelper {
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.common.model;
import org.apache.parquet.schema.PrimitiveStringifier;
import java.util.Objects;
/**
@@ -30,16 +28,28 @@ public class HoodieColumnRangeMetadata<T> {
private final String columnName;
private final T minValue;
private final T maxValue;
private final long numNulls;
private final PrimitiveStringifier stringifier;
private long numNulls;
// For Decimal Type/Date Type, minValue/maxValue cannot represent it's original value.
// eg: when parquet collects column information, the decimal type is collected as int/binary type.
// so we cannot use minValue and maxValue directly, use minValueAsString/maxValueAsString instead.
private final String minValueAsString;
private final String maxValueAsString;
public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) {
public HoodieColumnRangeMetadata(
final String filePath,
final String columnName,
final T minValue,
final T maxValue,
long numNulls,
final String minValueAsString,
final String maxValueAsString) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
this.maxValue = maxValue;
this.numNulls = numNulls;
this.stringifier = stringifier;
this.numNulls = numNulls == -1 ? 0 : numNulls;
this.minValueAsString = minValueAsString;
this.maxValueAsString = maxValueAsString;
}
public String getFilePath() {
@@ -58,8 +68,12 @@ public class HoodieColumnRangeMetadata<T> {
return this.maxValue;
}
public PrimitiveStringifier getStringifier() {
return stringifier;
public String getMaxValueAsString() {
return maxValueAsString;
}
public String getMinValueAsString() {
return minValueAsString;
}
public long getNumNulls() {
@@ -79,12 +93,14 @@ public class HoodieColumnRangeMetadata<T> {
&& Objects.equals(getColumnName(), that.getColumnName())
&& Objects.equals(getMinValue(), that.getMinValue())
&& Objects.equals(getMaxValue(), that.getMaxValue())
&& Objects.equals(getNumNulls(), that.getNumNulls());
&& Objects.equals(getNumNulls(), that.getNumNulls())
&& Objects.equals(getMinValueAsString(), that.getMinValueAsString())
&& Objects.equals(getMaxValueAsString(), that.getMaxValueAsString());
}
@Override
public int hashCode() {
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls());
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls(), getMinValueAsString(), getMaxValueAsString());
}
@Override
@@ -94,6 +110,8 @@ public class HoodieColumnRangeMetadata<T> {
+ "columnName='" + columnName + '\''
+ ", minValue=" + minValue
+ ", maxValue=" + maxValue
+ ", numNulls=" + numNulls + '}';
+ ", numNulls=" + numNulls
+ ", minValueAsString=" + minValueAsString
+ ", minValueAsString=" + maxValueAsString + '}';
}
}

View File

@@ -39,6 +39,7 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import java.io.IOException;
import java.util.ArrayList;
@@ -56,6 +57,8 @@ import java.util.stream.Collectors;
*/
public class ParquetUtils extends BaseFileUtils {
private static Object lock = new Object();
/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
@@ -283,17 +286,38 @@ public class ParquetUtils extends BaseFileUtils {
/**
* Parse min/max statistics stored in parquet footers for all columns.
* ParquetRead.readFooter is not a thread safe method.
*
* @param conf hadoop conf.
* @param parquetFilePath file to be read.
* @param cols cols which need to collect statistics.
* @return a HoodieColumnRangeMetadata instance.
*/
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, List<String> cols) {
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
Configuration conf,
Path parquetFilePath,
List<String> cols) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
// collect stats from all parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
columnChunkMetaData.getPrimitiveType().stringifier()));
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
String minAsString;
String maxAsString;
if (columnChunkMetaData.getPrimitiveType().getOriginalType() == OriginalType.DATE) {
synchronized (lock) {
minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
} else {
minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
minAsString, maxAsString);
});
}).collect(Collectors.groupingBy(e -> e.getColumnName()));
// we only intend to keep file level statistics.
@@ -316,23 +340,41 @@ public class ParquetUtils extends BaseFileUtils {
HoodieColumnRangeMetadata<Comparable> range2) {
final Comparable minValue;
final Comparable maxValue;
final String minValueAsString;
final String maxValueAsString;
if (range1.getMinValue() != null && range2.getMinValue() != null) {
minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue();
if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
} else {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
}
} else if (range1.getMinValue() == null) {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
} else {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
}
if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue();
if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
}
} else if (range1.getMaxValue() == null) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
}
return new HoodieColumnRangeMetadata<>(range1.getFilePath(),
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), range1.getStringifier());
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), minValueAsString, maxValueAsString);
}
}

View File

@@ -100,7 +100,7 @@ object DataSkippingUtils {
// query filter "colA >= b" convert it to "colA_maxValue >= b" for index table
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
val colName = getTargetColNameParts(attribute)
GreaterThanOrEqual(maxValue(colName), right)
reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right))
// query filter "b >= colA" convert it to "colA_minValue <= b" for index table
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
@@ -179,7 +179,7 @@ object DataSkippingUtils {
def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = {
val basePath = new Path(indexPath)
basePath.getFileSystem(conf)
.listStatus(basePath).filterNot(f => f.getPath.getName.endsWith(".parquet"))
.listStatus(basePath).filter(f => f.getPath.getName.endsWith(".parquet"))
}
/**

View File

@@ -21,9 +21,11 @@ package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.ZCurveOptimizeHelper
import org.apache.spark.sql._
@@ -88,8 +90,23 @@ class TestOptimizeTable extends HoodieClientTestBase {
.save(basePath)
assertEquals(1000, spark.read.format("hudi").load(basePath).count())
assertEquals(1000,
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(basePath).count())
// use unsorted col as filter.
assertEquals(spark.read
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count())
// use sorted col as filter.
assertEquals(spark.read.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count())
// use sorted cols and unsorted cols as filter
assertEquals(spark.read.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count())
}
@Test
@@ -97,10 +114,13 @@ class TestOptimizeTable extends HoodieClientTestBase {
val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
try {
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
// test z-order sort for all primitive type, should not throw exception.
ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
ZCurveOptimizeHelper.createZIndexedDataFrameBySample(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
// do not support TimeStampType, so if we collect statistics for c4, should throw exception
val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8")
colDf.cache()
@@ -115,12 +135,59 @@ class TestOptimizeTable extends HoodieClientTestBase {
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3"))
assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
assertEquals(fs.exists(new Path(statisticPath, "3")), true)
// test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved.
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4"))
assertEquals(fs.exists(new Path(statisticPath, "5")), true)
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
if (fs.exists(statisticPath)) fs.delete(statisticPath)
}
}
// test collect min-max statistic info for DateType in the case of multithreading.
// parquet will give a wrong statistic result for DateType in the case of multithreading.
@Test
def testMultiThreadParquetFooterReadForDateType(): Unit = {
// create parquet file with DateType
val rdd = spark.sparkContext.parallelize(0 to 100, 1)
.map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")))
val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType")
val conf = spark.sparkContext.hadoopConfiguration
val cols = new java.util.ArrayList[String]
cols.add("id")
try {
df.repartition(3).write.mode("overwrite").save(testPath.toString)
val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x)
val realResult = new Array[(String, String)](3)
inputFiles.zipWithIndex.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}
// multi thread read with no lock
val resUseLock = new Array[(String, String)](3)
inputFiles.zipWithIndex.par.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}
// check resUseNoLock,
// We can't guarantee that there must be problems in the case of multithreading.
// In order to make ut pass smoothly, we will not check resUseNoLock.
// check resUseLock
// should pass assert
realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}")
}
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
}
}
def createComplexDataFrame(spark: SparkSession): DataFrame = {
val schema = new StructType()
.add("c1", IntegerType)