Merge pull request #4308 from harsh1231/HUDI-3008
[HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested fields
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
package org.apache.hudi
|
||||
|
||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||
|
||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice
|
||||
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
||||
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||
@@ -37,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N
|
||||
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
|
||||
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.types.{StructField, StructType}
|
||||
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
||||
@@ -108,7 +106,7 @@ case class HoodieFileIndex(
|
||||
private lazy val _partitionSchemaFromProperties: StructType = {
|
||||
val tableConfig = metaClient.getTableConfig
|
||||
val partitionColumns = tableConfig.getPartitionFields
|
||||
val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
|
||||
val nameFieldMap = generateNameFieldMap(Right(schema))
|
||||
|
||||
if (partitionColumns.isPresent) {
|
||||
val partitionFields = partitionColumns.get().map(column =>
|
||||
@@ -123,6 +121,25 @@ case class HoodieFileIndex(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method traverses StructType recursively to build map of columnName -> StructField
|
||||
* Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding
|
||||
* only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"]
|
||||
* @param structField
|
||||
* @return map of ( columns names -> StructField )
|
||||
*/
|
||||
private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = {
|
||||
structField match {
|
||||
case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap
|
||||
case Left(field) => field.dataType match {
|
||||
case struct: StructType => generateNameFieldMap(Right(struct)).map {
|
||||
case (key: String, sf: StructField) => (field.name + "." + key, sf)
|
||||
}
|
||||
case _ => Map(field.name -> field)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
|
||||
|
||||
private lazy val configProperties = {
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
package org.apache.hudi
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
@@ -31,6 +30,7 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
|
||||
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType}
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.functions.{lit, struct}
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
|
||||
import org.apache.spark.sql.execution.datasources.PartitionDirectory
|
||||
import org.apache.spark.sql.types.StringType
|
||||
@@ -38,7 +38,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.JavaConverters._
|
||||
@@ -253,6 +253,29 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
|
||||
assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(Array("true,a.b.c","false,a.b.c","true,c","false,c"))
|
||||
def testQueryPartitionPathsForNestedPartition(useMetaFileList:Boolean, partitionBy:String): Unit = {
|
||||
val inputDF = spark.range(100)
|
||||
.withColumn("c",lit("c"))
|
||||
.withColumn("b",struct("c"))
|
||||
.withColumn("a",struct("b"))
|
||||
inputDF.write.format("hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(RECORDKEY_FIELD.key, "id")
|
||||
.option(PRECOMBINE_FIELD.key, "id")
|
||||
.option(PARTITIONPATH_FIELD.key, partitionBy)
|
||||
.option(HoodieMetadataConfig.ENABLE.key(), useMetaFileList)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient)
|
||||
val fileIndex = HoodieFileIndex(spark, metaClient, None,
|
||||
queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString))
|
||||
// test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break
|
||||
assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c"))
|
||||
}
|
||||
|
||||
private def attribute(partition: String): AttributeReference = {
|
||||
AttributeReference(partition, StringType, true)()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user