diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index db686674d..372b3936b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -175,20 +175,45 @@ public class TableSchemaResolver { * @throws Exception */ public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false); + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), false); return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); } + /** + * Gets users data schema for a hoodie table in Avro format of the instant. + * + * @param instant will get the instant data schema + * @return Avro user data schema + * @throws Exception + */ + public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception { + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false); + return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : + HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + } + /** * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. * * @return Avro schema for this table */ private Option getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields); + } + + + /** + * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the instant. + * + * @return Avro schema for this table + */ + private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { try { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get(); + byte[] data = timeline.getInstantDetails(instant).get(); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e153e9249..d66fa9694 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -108,6 +108,15 @@ object DataSourceReadOptions { */ val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime" + /** + * If use the end instant schema when incrementally fetched data to. + * + * Default: false (use latest instant schema) + * + */ + val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.schema.use.end.instanttime" + val DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL = "false" + /** * For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions, * filters appearing late in the sequence of transformations cannot be automatically pushed down. diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index e113d4af5..ff68ef077 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -55,7 +55,6 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) - val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) @@ -76,6 +75,9 @@ class IncrementalRelation(val sqlContext: SQLContext, s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}") } + val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY, + DataSourceReadOptions.DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL).toBoolean + private val lastInstant = commitTimeline.lastInstant().get() private val commitsToReturn = commitTimeline.findInstantsInRange( @@ -83,11 +85,16 @@ class IncrementalRelation(val sqlContext: SQLContext, optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp)) .getInstants.iterator().toList - // use schema from a file produced in the latest instant - val latestSchema: StructType = { + // use schema from a file produced in the end/latest instant + val usedSchema: StructType = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields + val tableSchema = if (useEndInstantSchema) { + if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else + schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last) + } else { + schemaResolver.getTableAvroSchemaWithoutMetadataFields() + } val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) StructType(skeletonSchema.fields ++ dataSchema.fields) } @@ -104,7 +111,7 @@ class IncrementalRelation(val sqlContext: SQLContext, } } - override def schema: StructType = latestSchema + override def schema: StructType = usedSchema override def buildScan(): RDD[Row] = { val regularFileIdToFullPath = mutable.HashMap[String, String]() @@ -148,12 +155,12 @@ class IncrementalRelation(val sqlContext: SQLContext, } else { log.info("Additional Filters to be applied to incremental source are :" + filters) - var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema) + var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) if (metaBootstrapFileIdToFullPath.nonEmpty) { df = sqlContext.sparkSession.read .format("hudi") - .schema(latestSchema) + .schema(usedSchema) .option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(",")) .load() } @@ -161,7 +168,7 @@ class IncrementalRelation(val sqlContext: SQLContext, if (regularFileIdToFullPath.nonEmpty) { df = df.union(sqlContext.read.options(sOpts) - .schema(latestSchema) + .schema(usedSchema) .parquet(filteredRegularFullPaths.toList: _*) .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))