diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 32bd9a439..d9f64ba9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) + val specifySchema = if (schema == null) { + // Load the schema from the commit meta data. + // Here we should specify the schema to the latest commit schema since + // the table schema evolution. + val tableSchemaResolver = new TableSchemaResolver(metaClient) + try { + Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema) + .dataType.asInstanceOf[StructType]) + } catch { + case _: Throwable => + None // If there is no commit in the table, we can not get the schema + // with tableSchemaResolver, return None here. + } + } else { + Some(schema) + } // simply return as a regular relation DataSource.apply( sparkSession = sqlContext.sparkSession, paths = extraReadPaths, - userSpecifiedSchema = Option(schema), + userSpecifiedSchema = specifySchema, className = formatClassName, options = optParams) .resolveRelation()