From 925873bb3cacbe7d2cd95d7bf0c0e5a40f56da12 Mon Sep 17 00:00:00 2001 From: Gary Li Date: Tue, 27 Jul 2021 17:30:01 +0800 Subject: [PATCH] [HUDI-2217] Fix no value present in incremental query on MOR (#3340) --- .../hudi/MergeOnReadIncrementalRelation.scala | 114 +++++++++--------- .../hudi/functional/TestCOWDataSource.scala | 8 ++ .../hudi/functional/TestMORDataSource.scala | 8 ++ 3 files changed, 76 insertions(+), 54 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b1fcf2fc6..68da5f7ae 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,6 @@ package org.apache.hudi -import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -52,7 +51,6 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation]) private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) - private val fs = FSUtils.getFs(metaClient.getBasePath, conf) private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") @@ -76,7 +74,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, private val tableAvroSchema = schemaUtil.getTableAvroSchema private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) - private val fileIndex = buildFileIndex() + private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() private val preCombineField = { val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField if (preCombineFieldFromTableConfig != null) { @@ -92,63 +90,71 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, override def needConversion: Boolean = false override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) - val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) - val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) - filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter - } - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") - log.debug(s"buildScan filters = ${filters.mkString(",")}") - // config to ensure the push down filter for parquet will be applied. - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") - val pushDownFilter = { + if (fileIndex.isEmpty) { + filters + } else { val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) - filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter + filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter } - val (requiredAvroSchema, requiredStructSchema) = - MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) + } - val hoodieTableState = HoodieMergeOnReadTableState( - tableStructSchema, - requiredStructSchema, - tableAvroSchema.toString, - requiredAvroSchema.toString, - fileIndex, - preCombineField - ) - val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( - sparkSession = sqlContext.sparkSession, - dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), - requiredSchema = tableStructSchema, - filters = pushDownFilter, - options = optParams, - hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() - ) - val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( - sparkSession = sqlContext.sparkSession, - dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), - requiredSchema = requiredStructSchema, - filters = pushDownFilter, - options = optParams, - hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() - ) + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + if (fileIndex.isEmpty) { + sqlContext.sparkContext.emptyRDD[Row] + } else { + log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") + log.debug(s"buildScan filters = ${filters.mkString(",")}") + // config to ensure the push down filter for parquet will be applied. + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + val pushDownFilter = { + val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) + filters :+ isNotNullFilter :+ largerThanFilter :+ lessThanFilter + } + val (requiredAvroSchema, requiredStructSchema) = + MergeOnReadSnapshotRelation.getRequiredSchema(tableAvroSchema, requiredColumns) - val rdd = new HoodieMergeOnReadRDD( - sqlContext.sparkContext, - jobConf, - fullSchemaParquetReader, - requiredSchemaParquetReader, - hoodieTableState - ) - rdd.asInstanceOf[RDD[Row]] + val hoodieTableState = HoodieMergeOnReadTableState( + tableStructSchema, + requiredStructSchema, + tableAvroSchema.toString, + requiredAvroSchema.toString, + fileIndex, + preCombineField + ) + val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableStructSchema, + filters = pushDownFilter, + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = requiredStructSchema, + filters = pushDownFilter, + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + + val rdd = new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + jobConf, + fullSchemaParquetReader, + requiredSchemaParquetReader, + hoodieTableState + ) + rdd.asInstanceOf[RDD[Row]] + } } def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index bc81a3e64..fa5479c7b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -230,6 +230,14 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(1, countsPerCommit.length) assertEquals(firstCommit, countsPerCommit(0).get(0)) + // Test incremental query has no instant in range + val emptyIncDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, "001") + .load(basePath) + assertEquals(0, emptyIncDF.count()) + // Upsert an empty dataFrame val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 1ad35f8fd..aacc46ae8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -201,6 +201,14 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(1, hudiIncDF3.select("_hoodie_commit_time").distinct().count()) assertEquals(commit2Time, hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString) + // Test incremental query has no instant in range + val emptyIncDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, "001") + .load(basePath) + assertEquals(0, emptyIncDF.count()) + // Unmerge val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)