[HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137)
This commit is contained in:
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.exception.HoodieException
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.sql.avro.SchemaConverters
|
||||
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
|
||||
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
|
||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||
@@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider
|
||||
classOf[HoodieROTablePathFilter],
|
||||
classOf[org.apache.hadoop.fs.PathFilter])
|
||||
|
||||
val specifySchema = if (schema == null) {
|
||||
// Load the schema from the commit meta data.
|
||||
// Here we should specify the schema to the latest commit schema since
|
||||
// the table schema evolution.
|
||||
val tableSchemaResolver = new TableSchemaResolver(metaClient)
|
||||
try {
|
||||
Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema)
|
||||
.dataType.asInstanceOf[StructType])
|
||||
} catch {
|
||||
case _: Throwable =>
|
||||
None // If there is no commit in the table, we can not get the schema
|
||||
// with tableSchemaResolver, return None here.
|
||||
}
|
||||
} else {
|
||||
Some(schema)
|
||||
}
|
||||
// simply return as a regular relation
|
||||
DataSource.apply(
|
||||
sparkSession = sqlContext.sparkSession,
|
||||
paths = extraReadPaths,
|
||||
userSpecifiedSchema = Option(schema),
|
||||
userSpecifiedSchema = specifySchema,
|
||||
className = formatClassName,
|
||||
options = optParams)
|
||||
.resolveRelation()
|
||||
|
||||
Reference in New Issue
Block a user