[HUDI-3093] fix spark-sql query table that write with TimestampBasedKeyGenerator (#4416)
This commit is contained in:
@@ -26,6 +26,8 @@ import org.apache.hudi.common.model.FileSlice
|
|||||||
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
||||||
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
|
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||||
@@ -35,11 +37,12 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N
|
|||||||
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
|
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types.{StructField, StructType}
|
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||||
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
|
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
|
||||||
import org.apache.spark.unsafe.types.UTF8String
|
import org.apache.spark.unsafe.types.UTF8String
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
@@ -109,10 +112,16 @@ case class HoodieFileIndex(
|
|||||||
val nameFieldMap = generateNameFieldMap(Right(schema))
|
val nameFieldMap = generateNameFieldMap(Right(schema))
|
||||||
|
|
||||||
if (partitionColumns.isPresent) {
|
if (partitionColumns.isPresent) {
|
||||||
|
if (tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedKeyGenerator].getName)
|
||||||
|
|| tableConfig.getKeyGeneratorClassName.equalsIgnoreCase(classOf[TimestampBasedAvroKeyGenerator].getName)) {
|
||||||
|
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
|
||||||
|
StructType(partitionFields)
|
||||||
|
} else {
|
||||||
val partitionFields = partitionColumns.get().map(column =>
|
val partitionFields = partitionColumns.get().map(column =>
|
||||||
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
|
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
|
||||||
s"$column' in the schema[${schema.fields.mkString(",")}]")))
|
s"$column' in the schema[${schema.fields.mkString(",")}]")))
|
||||||
new StructType(partitionFields)
|
StructType(partitionFields)
|
||||||
|
}
|
||||||
} else { // If the partition columns have not stored in hoodie.properties(the table that was
|
} else { // If the partition columns have not stored in hoodie.properties(the table that was
|
||||||
// created earlier), we trait it as a non-partitioned table.
|
// created earlier), we trait it as a non-partitioned table.
|
||||||
logWarning("No partition columns available from hoodie.properties." +
|
logWarning("No partition columns available from hoodie.properties." +
|
||||||
|
|||||||
Reference in New Issue
Block a user