From 1f7afba5e4c13c317c5378c7088dddb13b71d558 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 28 Dec 2021 13:39:52 +0800 Subject: [PATCH] [HUDI-3093] fix spark-sql query table that write with TimestampBasedKeyGenerator (#4416) --- .../scala/org/apache/hudi/HoodieFileIndex.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 0ed1b4852..1a34d18ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -26,6 +26,8 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} @@ -35,11 +37,12 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String import java.util.Properties + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable @@ -109,10 +112,16 @@ case class HoodieFileIndex( val nameFieldMap = generateNameFieldMap(Right(schema)) if (partitionColumns.isPresent) { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName) + || tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) { + val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + s"$column' in the schema[${schema.fields.mkString(",")}]"))) - new StructType(partitionFields) + StructType(partitionFields) + } } else { // If the partition columns have not stored in hoodie.properties(the table that was // created earlier), we trait it as a non-partitioned table. logWarning("No partition columns available from hoodie.properties." +