[HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513)
This commit is contained in:
@@ -17,8 +17,9 @@
|
|||||||
|
|
||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import java.util.stream.Collectors
|
import org.apache.avro.Schema
|
||||||
|
|
||||||
|
import java.util.stream.Collectors
|
||||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
|
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
||||||
@@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
} else {
|
} else {
|
||||||
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
|
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
|
||||||
}
|
}
|
||||||
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
if (tableSchema.getType == Schema.Type.NULL) {
|
||||||
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
// if there is only one commit in the table and is an empty commit without schema, return empty RDD here
|
||||||
|
StructType(Nil)
|
||||||
|
} else {
|
||||||
|
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||||
|
StructType(skeletonSchema.fields ++ dataSchema.fields)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
|
private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
|
||||||
@@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
override def schema: StructType = usedSchema
|
override def schema: StructType = usedSchema
|
||||||
|
|
||||||
override def buildScan(): RDD[Row] = {
|
override def buildScan(): RDD[Row] = {
|
||||||
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
if (usedSchema == StructType(Nil)) {
|
||||||
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
|
// if first commit in a table is an empty commit without schema, return empty RDD here
|
||||||
|
|
||||||
// create Replaced file group
|
|
||||||
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
|
|
||||||
val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
|
|
||||||
val replaceMetadata = HoodieReplaceCommitMetadata.
|
|
||||||
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
|
|
||||||
replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
|
|
||||||
entry.getValue.map { e =>
|
|
||||||
val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
|
|
||||||
(e, fullPath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.toMap
|
|
||||||
|
|
||||||
for (commit <- commitsToReturn) {
|
|
||||||
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
|
|
||||||
.get, classOf[HoodieCommitMetadata])
|
|
||||||
|
|
||||||
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
|
|
||||||
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
|
||||||
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
|
||||||
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
|
||||||
// filer out meta bootstrap files that have had more commits since metadata bootstrap
|
|
||||||
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
|
|
||||||
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
|
|
||||||
}
|
|
||||||
|
|
||||||
val pathGlobPattern = optParams.getOrElse(
|
|
||||||
DataSourceReadOptions.INCR_PATH_GLOB.key,
|
|
||||||
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
|
|
||||||
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
|
|
||||||
if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
|
|
||||||
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
|
||||||
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
|
|
||||||
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
|
|
||||||
} else {
|
|
||||||
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
|
||||||
// will filter out all the files incorrectly.
|
|
||||||
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
|
|
||||||
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
|
||||||
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
|
|
||||||
sqlContext.sparkContext.emptyRDD[Row]
|
sqlContext.sparkContext.emptyRDD[Row]
|
||||||
} else {
|
} else {
|
||||||
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
||||||
|
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
|
||||||
|
|
||||||
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
|
// create Replaced file group
|
||||||
|
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
|
||||||
|
val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
|
||||||
|
val replaceMetadata = HoodieReplaceCommitMetadata.
|
||||||
|
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
|
||||||
|
replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
|
||||||
|
entry.getValue.map { e =>
|
||||||
|
val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
|
||||||
|
(e, fullPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.toMap
|
||||||
|
|
||||||
|
for (commit <- commitsToReturn) {
|
||||||
|
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
|
||||||
|
.get, classOf[HoodieCommitMetadata])
|
||||||
|
|
||||||
|
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
|
||||||
|
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
||||||
|
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
||||||
|
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
||||||
df = sqlContext.sparkSession.read
|
// filer out meta bootstrap files that have had more commits since metadata bootstrap
|
||||||
.format("hudi")
|
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
|
||||||
.schema(usedSchema)
|
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
|
||||||
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
|
|
||||||
.load()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (regularFileIdToFullPath.nonEmpty)
|
val pathGlobPattern = optParams.getOrElse(
|
||||||
{
|
DataSourceReadOptions.INCR_PATH_GLOB.key,
|
||||||
df = df.union(sqlContext.read.options(sOpts)
|
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
|
||||||
.schema(usedSchema)
|
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
|
||||||
.parquet(filteredRegularFullPaths.toList: _*)
|
if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
|
||||||
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
val globMatcher = new GlobPattern("*" + pathGlobPattern)
|
||||||
commitsToReturn.head.getTimestamp))
|
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
|
||||||
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
|
||||||
commitsToReturn.last.getTimestamp)))
|
} else {
|
||||||
|
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
|
||||||
|
// will filter out all the files incorrectly.
|
||||||
|
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
|
||||||
|
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
|
||||||
|
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
|
||||||
|
sqlContext.sparkContext.emptyRDD[Row]
|
||||||
|
} else {
|
||||||
|
log.info("Additional Filters to be applied to incremental source are :" + filters)
|
||||||
|
|
||||||
filters.foldLeft(df)((e, f) => e.filter(f)).rdd
|
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
|
||||||
|
|
||||||
|
if (metaBootstrapFileIdToFullPath.nonEmpty) {
|
||||||
|
df = sqlContext.sparkSession.read
|
||||||
|
.format("hudi")
|
||||||
|
.schema(usedSchema)
|
||||||
|
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
|
||||||
|
.load()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (regularFileIdToFullPath.nonEmpty) {
|
||||||
|
df = df.union(sqlContext.read.options(sOpts)
|
||||||
|
.schema(usedSchema)
|
||||||
|
.parquet(filteredRegularFullPaths.toList: _*)
|
||||||
|
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
||||||
|
commitsToReturn.head.getTimestamp))
|
||||||
|
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
|
||||||
|
commitsToReturn.last.getTimestamp)))
|
||||||
|
}
|
||||||
|
|
||||||
|
filters.foldLeft(df)((e, f) => e.filter(f)).rdd
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,6 +110,10 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
|
|||||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
|
||||||
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
|
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
|
||||||
Dataset<Row> source = metaReader.load(srcPath);
|
Dataset<Row> source = metaReader.load(srcPath);
|
||||||
|
|
||||||
|
if (source.isEmpty()) {
|
||||||
|
return Pair.of(Option.empty(), instantEndpts.getRight());
|
||||||
|
}
|
||||||
|
|
||||||
String filter = "s3.object.size > 0";
|
String filter = "s3.object.size > 0";
|
||||||
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
|
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
|
||||||
|
|||||||
Reference in New Issue
Block a user