[HUDI-3337] Fixing Parquet Column Range metadata extraction (#4705)
- Parquet Column Range metadata extraction utility was simplistically assuming that Decimal types are only represented by INT32, while they representation varies depending on precision. - More details could be found here: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#DECIMAL
This commit is contained in:
@@ -18,16 +18,25 @@
|
||||
|
||||
package org.apache.hudi.functional
|
||||
|
||||
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieColumnRangeMetadata
|
||||
import org.apache.hudi.common.util.ParquetUtils
|
||||
import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||
import org.apache.spark.sql.expressions.UserDefinedFunction
|
||||
import org.apache.spark.sql.functions.typedLit
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession, functions}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import java.math.BigInteger
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.sql.{Date, Timestamp}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.{Random, Success}
|
||||
|
||||
class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
var spark: SparkSession = _
|
||||
@@ -58,15 +67,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
def testColumnStatsTableComposition(): Unit = {
|
||||
def testZIndexTableComposition(): Unit = {
|
||||
val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
|
||||
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
|
||||
|
||||
bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath)
|
||||
|
||||
val inputDf =
|
||||
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
|
||||
spark.read
|
||||
.schema(sourceTableSchema)
|
||||
.parquet(
|
||||
getClass.getClassLoader.getResource("index/zorder/input-table").toString
|
||||
)
|
||||
.parquet(targetParquetTablePath)
|
||||
|
||||
val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
|
||||
val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
|
||||
@@ -75,22 +86,18 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
val newZIndexTableDf =
|
||||
ColumnStatsIndexHelper.buildColumnStatsTableFor(
|
||||
inputDf.sparkSession,
|
||||
inputDf.inputFiles.toSeq,
|
||||
zorderedColsSchemaFields
|
||||
inputDf.inputFiles.toSeq.asJava,
|
||||
zorderedColsSchemaFields.asJava
|
||||
)
|
||||
|
||||
val indexSchema =
|
||||
ColumnStatsIndexHelper.composeIndexSchema(
|
||||
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
|
||||
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
|
||||
)
|
||||
|
||||
// Collect Z-index stats manually (reading individual Parquet files)
|
||||
val manualZIndexTableDf =
|
||||
buildColumnStatsTableManually(
|
||||
getClass.getClassLoader.getResource("index/zorder/input-table").toString,
|
||||
zorderedCols,
|
||||
indexSchema
|
||||
)
|
||||
buildColumnStatsTableManually(targetParquetTablePath, zorderedCols, indexSchema)
|
||||
|
||||
// NOTE: Z-index is built against stats collected w/in Parquet footers, which will be
|
||||
// represented w/ corresponding Parquet schema (INT, INT64, INT96, etc).
|
||||
@@ -107,18 +114,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
.schema(indexSchema)
|
||||
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
|
||||
|
||||
assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(newZIndexTableDf)))
|
||||
assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(replace(newZIndexTableDf))))
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
def testColumnStatsTableMerge(): Unit = {
|
||||
def testZIndexTableMerge(): Unit = {
|
||||
val testZIndexPath = new Path(basePath, "zindex")
|
||||
|
||||
val firstParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
|
||||
val firstJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
|
||||
|
||||
// Bootstrap FIRST source Parquet table
|
||||
bootstrapParquetInputTableFromJSON(firstJSONTablePath, firstParquetTablePath)
|
||||
|
||||
val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
|
||||
val indexSchema =
|
||||
ColumnStatsIndexHelper.composeIndexSchema(
|
||||
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
|
||||
sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
|
||||
)
|
||||
|
||||
//
|
||||
@@ -126,19 +138,16 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
//
|
||||
|
||||
val firstCommitInstance = "0"
|
||||
val firstInputDf =
|
||||
spark.read.parquet(
|
||||
getClass.getClassLoader.getResource("index/zorder/input-table").toString
|
||||
)
|
||||
val firstInputDf = spark.read.parquet(firstParquetTablePath)
|
||||
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
firstInputDf.sparkSession,
|
||||
sourceTableSchema,
|
||||
firstInputDf.inputFiles.toSeq,
|
||||
zorderedCols.toSeq,
|
||||
firstInputDf.inputFiles.toSeq.asJava,
|
||||
zorderedCols.asJava,
|
||||
testZIndexPath.toString,
|
||||
firstCommitInstance,
|
||||
Seq()
|
||||
Seq().asJava
|
||||
)
|
||||
|
||||
// NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
|
||||
@@ -152,15 +161,19 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
.schema(indexSchema)
|
||||
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
|
||||
|
||||
assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(initialZIndexTable)))
|
||||
assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(replace(initialZIndexTable))))
|
||||
|
||||
// Bootstrap SECOND source Parquet table
|
||||
val secondParquetTablePath = tempDir.resolve("index/zorder/another-input-table").toAbsolutePath.toString
|
||||
val secondJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
|
||||
|
||||
bootstrapParquetInputTableFromJSON(secondJSONTablePath, secondParquetTablePath)
|
||||
|
||||
val secondCommitInstance = "1"
|
||||
val secondInputDf =
|
||||
spark.read
|
||||
.schema(sourceTableSchema)
|
||||
.parquet(
|
||||
getClass.getClassLoader.getResource("index/zorder/another-input-table").toString
|
||||
)
|
||||
.parquet(secondParquetTablePath)
|
||||
|
||||
//
|
||||
// Update Z-index table
|
||||
@@ -169,11 +182,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
secondInputDf.sparkSession,
|
||||
sourceTableSchema,
|
||||
secondInputDf.inputFiles.toSeq,
|
||||
zorderedCols.toSeq,
|
||||
secondInputDf.inputFiles.toSeq.asJava,
|
||||
zorderedCols.asJava,
|
||||
testZIndexPath.toString,
|
||||
secondCommitInstance,
|
||||
Seq(firstCommitInstance)
|
||||
Seq(firstCommitInstance).asJava
|
||||
)
|
||||
|
||||
// NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
|
||||
@@ -187,56 +200,96 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
.schema(indexSchema)
|
||||
.json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString)
|
||||
|
||||
assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(mergedZIndexTable)))
|
||||
assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(replace(mergedZIndexTable))))
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
def testColumnStatsTablesGarbageCollection(): Unit = {
|
||||
val testZIndexPath = new Path(System.getProperty("java.io.tmpdir"), "zindex")
|
||||
val fs = testZIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
||||
val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
|
||||
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
|
||||
|
||||
val inputDf =
|
||||
spark.read.parquet(
|
||||
getClass.getClassLoader.getResource("index/zorder/input-table").toString
|
||||
)
|
||||
bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath)
|
||||
|
||||
val inputDf = spark.read.parquet(targetParquetTablePath)
|
||||
|
||||
val testColumnStatsIndexPath = new Path(tempDir.resolve("zindex").toAbsolutePath.toString)
|
||||
val fs = testColumnStatsIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
||||
|
||||
// Try to save statistics
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
inputDf.sparkSession,
|
||||
sourceTableSchema,
|
||||
inputDf.inputFiles.toSeq,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8"),
|
||||
testZIndexPath.toString,
|
||||
inputDf.inputFiles.toSeq.asJava,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8").asJava,
|
||||
testColumnStatsIndexPath.toString,
|
||||
"2",
|
||||
Seq("0", "1")
|
||||
Seq("0", "1").asJava
|
||||
)
|
||||
|
||||
// Save again
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
inputDf.sparkSession,
|
||||
sourceTableSchema,
|
||||
inputDf.inputFiles.toSeq,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8"),
|
||||
testZIndexPath.toString,
|
||||
inputDf.inputFiles.toSeq.asJava,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8").asJava,
|
||||
testColumnStatsIndexPath.toString,
|
||||
"3",
|
||||
Seq("0", "1", "2")
|
||||
Seq("0", "1", "2").asJava
|
||||
)
|
||||
|
||||
// Test old index table being cleaned up
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
inputDf.sparkSession,
|
||||
sourceTableSchema,
|
||||
inputDf.inputFiles.toSeq,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8"),
|
||||
testZIndexPath.toString,
|
||||
inputDf.inputFiles.toSeq.asJava,
|
||||
Seq("c1","c2","c3","c5","c6","c7","c8").asJava,
|
||||
testColumnStatsIndexPath.toString,
|
||||
"4",
|
||||
Seq("0", "1", "3")
|
||||
Seq("0", "1", "3").asJava
|
||||
)
|
||||
|
||||
assertEquals(!fs.exists(new Path(testZIndexPath, "2")), true)
|
||||
assertEquals(!fs.exists(new Path(testZIndexPath, "3")), true)
|
||||
assertEquals(fs.exists(new Path(testZIndexPath, "4")), true)
|
||||
assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "2")), true)
|
||||
assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "3")), true)
|
||||
assertEquals(fs.exists(new Path(testColumnStatsIndexPath, "4")), true)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testParquetMetadataRangeExtraction(): Unit = {
|
||||
val df = generateRandomDataFrame(spark)
|
||||
|
||||
val pathStr = tempDir.resolve("min-max").toAbsolutePath.toString
|
||||
|
||||
df.write.format("parquet")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(pathStr)
|
||||
|
||||
val utils = new ParquetUtils
|
||||
|
||||
val conf = new Configuration()
|
||||
val path = new Path(pathStr)
|
||||
val fs = path.getFileSystem(conf)
|
||||
|
||||
val parquetFilePath = fs.listStatus(path).filter(fs => fs.getPath.getName.endsWith(".parquet")).toSeq.head.getPath
|
||||
|
||||
val ranges = utils.readRangeFromParquetMetadata(conf, parquetFilePath,
|
||||
Seq("c1", "c2", "c3a", "c3b", "c3c", "c4", "c5", "c6", "c7", "c8").asJava)
|
||||
|
||||
ranges.asScala.foreach(r => {
|
||||
// NOTE: Unfortunately Parquet can't compute statistics for Timestamp column, hence we
|
||||
// skip it in our assertions
|
||||
if (r.getColumnName.equals("c4")) {
|
||||
// scalastyle:off return
|
||||
return
|
||||
// scalastyle:on return
|
||||
}
|
||||
|
||||
val min = r.getMinValue
|
||||
val max = r.getMaxValue
|
||||
|
||||
assertNotNull(min)
|
||||
assertNotNull(max)
|
||||
assertTrue(r.getMinValue.asInstanceOf[Comparable[Object]].compareTo(r.getMaxValue.asInstanceOf[Object]) <= 0)
|
||||
})
|
||||
}
|
||||
|
||||
private def buildColumnStatsTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = {
|
||||
@@ -268,11 +321,85 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
|
||||
df.selectExpr(exprs: _*)
|
||||
.collect()
|
||||
}),
|
||||
}).asJava,
|
||||
indexSchema
|
||||
)
|
||||
}
|
||||
|
||||
def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, targetParquetTablePath: String): Unit = {
|
||||
val jsonInputDF =
|
||||
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
|
||||
spark.read
|
||||
.schema(sourceTableSchema)
|
||||
.json(sourceJSONTablePath)
|
||||
|
||||
jsonInputDF
|
||||
.sort("c1")
|
||||
.repartition(4, new Column("c1"))
|
||||
.write
|
||||
.format("parquet")
|
||||
.mode("overwrite")
|
||||
.save(targetParquetTablePath)
|
||||
|
||||
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
|
||||
// Have to cleanup additional artefacts of Spark write
|
||||
fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
|
||||
}
|
||||
|
||||
def replace(ds: Dataset[Row]): DataFrame = {
|
||||
val uuidRegexp = "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}"
|
||||
|
||||
val uuids =
|
||||
ds.selectExpr(s"regexp_extract(file, '(${uuidRegexp})')")
|
||||
.distinct()
|
||||
.collect()
|
||||
.map(_.getString(0))
|
||||
.sorted
|
||||
|
||||
val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => {
|
||||
val (uuid, idx) = uuids.zipWithIndex.find { case (uuid, _) => fileName.contains(uuid) }.get
|
||||
fileName.replace(uuid, idx.toString)
|
||||
})
|
||||
|
||||
ds.withColumn("file", uuidToIdx(ds("file")))
|
||||
}
|
||||
|
||||
private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
|
||||
val sourceTableSchema =
|
||||
new StructType()
|
||||
.add("c1", IntegerType)
|
||||
.add("c2", StringType)
|
||||
// NOTE: We're testing different values for precision of the decimal to make sure
|
||||
// we execute paths bearing different underlying representations in Parquet
|
||||
// REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#DECIMAL
|
||||
.add("c3a", DecimalType(9,3))
|
||||
.add("c3b", DecimalType(10,3))
|
||||
.add("c3c", DecimalType(20,3))
|
||||
.add("c4", TimestampType)
|
||||
.add("c5", ShortType)
|
||||
.add("c6", DateType)
|
||||
.add("c7", BinaryType)
|
||||
.add("c8", ByteType)
|
||||
|
||||
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
|
||||
val c1 = Integer.valueOf(item)
|
||||
val c2 = Random.nextString(10)
|
||||
val c3a = java.math.BigDecimal.valueOf(Random.nextInt() % (1 << 24), 3)
|
||||
val c3b = java.math.BigDecimal.valueOf(Random.nextLong() % (1L << 32), 3)
|
||||
// NOTE: We cap it at 2^64 to make sure we're not exceeding target decimal's range
|
||||
val c3c = new java.math.BigDecimal(new BigInteger(64, new java.util.Random()), 3)
|
||||
val c4 = new Timestamp(System.currentTimeMillis())
|
||||
val c5 = java.lang.Short.valueOf(s"${(item + 16) / 10}")
|
||||
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
|
||||
val c7 = Array(item).map(_.toByte)
|
||||
val c8 = java.lang.Byte.valueOf("9")
|
||||
|
||||
RowFactory.create(c1, c2, c3a, c3b, c3c, c4, c5, c6, c7, c8)
|
||||
}
|
||||
|
||||
spark.createDataFrame(rdd, sourceTableSchema)
|
||||
}
|
||||
|
||||
private def asJson(df: DataFrame) =
|
||||
df.toJSON
|
||||
.select("value")
|
||||
@@ -281,7 +408,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
|
||||
.map(_.getString(0))
|
||||
.mkString("\n")
|
||||
|
||||
|
||||
private def sort(df: DataFrame): DataFrame = {
|
||||
// Since upon parsing JSON, Spark re-order columns in lexicographical order
|
||||
// of their names, we have to shuffle new Z-index table columns order to match
|
||||
|
||||
@@ -32,9 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.Arguments.arguments
|
||||
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
|
||||
|
||||
import java.sql.{Date, Timestamp}
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.util.Random
|
||||
|
||||
@Tag("functional")
|
||||
class TestLayoutOptimization extends HoodieClientTestBase {
|
||||
@@ -151,22 +149,6 @@ class TestLayoutOptimization extends HoodieClientTestBase {
|
||||
val rows = one.count()
|
||||
assert(rows == other.count() && one.intersect(other).count() == rows)
|
||||
}
|
||||
|
||||
def createComplexDataFrame(spark: SparkSession): DataFrame = {
|
||||
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
|
||||
val c1 = Integer.valueOf(item)
|
||||
val c2 = s" ${item}sdc"
|
||||
val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
|
||||
val c4 = new Timestamp(System.currentTimeMillis())
|
||||
val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
|
||||
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
|
||||
val c7 = Array(item).map(_.toByte)
|
||||
val c8 = java.lang.Byte.valueOf("9")
|
||||
|
||||
RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
|
||||
}
|
||||
spark.createDataFrame(rdd, sourceTableSchema)
|
||||
}
|
||||
}
|
||||
|
||||
object TestLayoutOptimization {
|
||||
|
||||
Reference in New Issue
Block a user