[HUDI-1301] use spark INCREMENTAL mode query hudi dataset support schema version. (#2125)
This commit is contained in:
@@ -175,7 +175,21 @@ public class TableSchemaResolver {
|
|||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
|
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
|
||||||
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false);
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
|
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), false);
|
||||||
|
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
|
||||||
|
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets users data schema for a hoodie table in Avro format of the instant.
|
||||||
|
*
|
||||||
|
* @param instant will get the instant data schema
|
||||||
|
* @return Avro user data schema
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception {
|
||||||
|
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false);
|
||||||
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
|
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
|
||||||
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
|
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
|
||||||
}
|
}
|
||||||
@@ -186,9 +200,20 @@ public class TableSchemaResolver {
|
|||||||
* @return Avro schema for this table
|
* @return Avro schema for this table
|
||||||
*/
|
*/
|
||||||
private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
|
private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
|
||||||
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
|
return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the instant.
|
||||||
|
*
|
||||||
|
* @return Avro schema for this table
|
||||||
|
*/
|
||||||
|
private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) {
|
||||||
try {
|
try {
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
|
byte[] data = timeline.getInstantDetails(instant).get();
|
||||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
|
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
|
||||||
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
|
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
|
||||||
|
|
||||||
|
|||||||
@@ -108,6 +108,15 @@ object DataSourceReadOptions {
|
|||||||
*/
|
*/
|
||||||
val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
|
val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If use the end instant schema when incrementally fetched data to.
|
||||||
|
*
|
||||||
|
* Default: false (use latest instant schema)
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.schema.use.end.instanttime"
|
||||||
|
val DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL = "false"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
|
* For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
|
||||||
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
|
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
|
||||||
|
|||||||
@@ -55,7 +55,6 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
|
|
||||||
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
||||||
|
|
||||||
|
|
||||||
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
|
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
|
||||||
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||||
|
|
||||||
@@ -76,6 +75,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
|
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY,
|
||||||
|
DataSourceReadOptions.DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL).toBoolean
|
||||||
|
|
||||||
private val lastInstant = commitTimeline.lastInstant().get()
|
private val lastInstant = commitTimeline.lastInstant().get()
|
||||||
|
|
||||||
private val commitsToReturn = commitTimeline.findInstantsInRange(
|
private val commitsToReturn = commitTimeline.findInstantsInRange(
|
||||||
@@ -83,11 +85,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
||||||
.getInstants.iterator().toList
|
.getInstants.iterator().toList
|
||||||
|
|
||||||
// use schema from a file produced in the latest instant
|
// use schema from a file produced in the end/latest instant
|
||||||
val latestSchema: StructType = {
|
val usedSchema: StructType = {
|
||||||
log.info("Inferring schema..")
|
log.info("Inferring schema..")
|
||||||
val schemaResolver = new TableSchemaResolver(metaClient)
|
val schemaResolver = new TableSchemaResolver(metaClient)
|
||||||
val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
|
val tableSchema = if (useEndInstantSchema) {
|
||||||
|
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
|
||||||
|
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
|
||||||
|
} else {
|
||||||
|
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
|
||||||
|
}
|
||||||
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||||
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
||||||
}
|
}
|
||||||
@@ -104,7 +111,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def schema: StructType = latestSchema
|
override def schema: StructType = usedSchema
|
||||||
|
|
||||||
override def buildScan(): RDD[Row] = {
|
override def buildScan(): RDD[Row] = {
|
||||||
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
||||||
@@ -148,12 +155,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
} else {
|
} else {
|
||||||
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
||||||
|
|
||||||
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
|
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
|
||||||
|
|
||||||
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
||||||
df = sqlContext.sparkSession.read
|
df = sqlContext.sparkSession.read
|
||||||
.format("hudi")
|
.format("hudi")
|
||||||
.schema(latestSchema)
|
.schema(usedSchema)
|
||||||
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
|
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
|
||||||
.load()
|
.load()
|
||||||
}
|
}
|
||||||
@@ -161,7 +168,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
if (regularFileIdToFullPath.nonEmpty)
|
if (regularFileIdToFullPath.nonEmpty)
|
||||||
{
|
{
|
||||||
df = df.union(sqlContext.read.options(sOpts)
|
df = df.union(sqlContext.read.options(sOpts)
|
||||||
.schema(latestSchema)
|
.schema(usedSchema)
|
||||||
.parquet(filteredRegularFullPaths.toList: _*)
|
.parquet(filteredRegularFullPaths.toList: _*)
|
||||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
||||||
commitsToReturn.head.getTimestamp))
|
commitsToReturn.head.getTimestamp))
|
||||||
|
|||||||
Reference in New Issue
Block a user