1
0

[HUDI-2217] Fix no value present in incremental query on MOR (#3340)

This commit is contained in:
Gary Li
2021-07-27 17:30:01 +08:00
committed by GitHub
parent ab2e0d0ba2
commit 925873bb3c
3 changed files with 76 additions and 54 deletions

View File

@@ -17,7 +17,6 @@
package org.apache.hudi
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -52,7 +51,6 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
private val fs = FSUtils.getFs(metaClient.getBasePath, conf)
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
@@ -76,7 +74,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
private val tableAvroSchema = schemaUtil.getTableAvroSchema
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
@@ -92,63 +90,71 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
override def needConversion: Boolean = false
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s"buildScan filters = ${filters.mkString(",")}")
// config to ensure the push down filter for parquet will be applied.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
val pushDownFilter = {
if (fileIndex.isEmpty) {
filters
} else {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
}
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex,
preCombineField
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
if (fileIndex.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s"buildScan filters = ${filters.mkString(",")}")
// config to ensure the push down filter for parquet will be applied.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
val pushDownFilter = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
val (requiredAvroSchema, requiredStructSchema) =
MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns)
val rdd = new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState
)
rdd.asInstanceOf[RDD[Row]]
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex,
preCombineField
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val rdd = new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState
)
rdd.asInstanceOf[RDD[Row]]
}
}
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {

View File

@@ -230,6 +230,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(1, countsPerCommit.length)
assertEquals(firstCommit, countsPerCommit(0).get(0))
// Test incremental query has no instant in range
val emptyIncDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, "001")
.load(basePath)
assertEquals(0, emptyIncDF.count())
// Upsert an empty dataFrame
val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))

View File

@@ -201,6 +201,14 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(1, hudiIncDF3.select("_hoodie_commit_time").distinct().count())
assertEquals(commit2Time, hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
// Test incremental query has no instant in range
val emptyIncDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, "001")
.load(basePath)
assertEquals(0, emptyIncDF.count())
// Unmerge
val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)