From 7e50f9a5a6be8d68b13a587a07a9af81819540ca Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Thu, 24 Jun 2021 13:48:01 +0800 Subject: [PATCH] [HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137) --- .../scala/org/apache/hudi/DefaultSource.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 32bd9a439..d9f64ba9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) + val specifySchema = if (schema == null) { + // Load the schema from the commit meta data. + // Here we should specify the schema to the latest commit schema since + // the table schema evolution. + val tableSchemaResolver = new TableSchemaResolver(metaClient) + try { + Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema) + .dataType.asInstanceOf[StructType]) + } catch { + case _: Throwable => + None // If there is no commit in the table, we can not get the schema + // with tableSchemaResolver, return None here. + } + } else { + Some(schema) + } // simply return as a regular relation DataSource.apply( sparkSession = sqlContext.sparkSession, paths = extraReadPaths, - userSpecifiedSchema = Option(schema), + userSpecifiedSchema = specifySchema, className = formatClassName, options = optParams) .resolveRelation()