From 32a50d8ddbbf706bd350e9cf0762f51ab66afcc9 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Sun, 8 Aug 2021 07:07:22 +0800 Subject: [PATCH] [HUDI-2243] Support Time Travel Query For Hoodie Table (#3360) --- .../org/apache/hudi/DataSourceOptions.scala | 7 + .../org/apache/hudi/HoodieFileIndex.scala | 30 ++- .../hudi/MergeOnReadSnapshotRelation.scala | 13 +- .../spark/sql/hudi/HoodieSqlUtils.scala | 28 ++- .../hudi/functional/TestTimeTravelQuery.scala | 230 ++++++++++++++++++ 5 files changed, 296 insertions(+), 12 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index b12361b61..5c67a68ad 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -116,6 +116,13 @@ object DataSourceReadOptions { .defaultValue("") .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") + + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty + .key("as.of.instant") + .noDefaultValue() + .withDocumentation("The query instant for time travel. Without specified this option," + + " we query the latest snapshot.") + } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 5543ea161..f94d228c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,14 +18,13 @@ package org.apache.hudi import java.util.Properties - import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} @@ -37,6 +36,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} +import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String @@ -81,6 +81,9 @@ case class HoodieFileIndex( private val tableType = metaClient.getTableType + private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlUtils.formatQueryInstant) + /** * Get the schema of the table. */ @@ -214,15 +217,23 @@ case class HoodieFileIndex( metaClient.reloadActiveTimeline() val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val latestInstant = activeInstants.lastInstant() fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + val queryInstant = if (specifiedQueryInstant.isDefined) { + specifiedQueryInstant + } else if (latestInstant.isPresent) { + Some(latestInstant.get.getTimestamp) + } else { + None + } (tableType, queryType) match { case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => // Fetch and store latest base and log files, and their sizes cachedAllInputFileSlices = partitionFiles.map(p => { - val latestSlices = if (activeInstants.lastInstant().isPresent) { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, - activeInstants.lastInstant().get().getTimestamp).iterator().asScala.toSeq + val latestSlices = if (latestInstant.isPresent) { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get) + .iterator().asScala.toSeq } else { Seq() } @@ -238,7 +249,12 @@ case class HoodieFileIndex( case (_, _) => // Fetch and store latest base files and its sizes cachedAllInputFileSlices = partitionFiles.map(p => { - (p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq) + val fileSlices = specifiedQueryInstant + .map(instant => + fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true)) + .getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath)) + .iterator().asScala.toSeq + (p._1, fileSlices) }) cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum } @@ -246,7 +262,7 @@ case class HoodieFileIndex( // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty) val flushSpend = System.currentTimeMillis() - startTime - logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," + + logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," + s" spend: $flushSpend ms") } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index dd18c5a32..cf8296c9f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType @@ -97,6 +98,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, override def needConversion: Boolean = false + private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlUtils.formatQueryInstant) + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") log.debug(s" buildScan filters = ${filters.mkString(",")}") @@ -159,7 +163,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, if (!lastInstant.isPresent) { // Return empty list if the table has no commit List.empty } else { - val latestCommit = lastInstant.get().getTimestamp + val queryInstant = specifiedQueryInstant.getOrElse(lastInstant.get().getTimestamp) val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala val fileSplits = baseAndLogsList.map(kv => { val baseFile = kv.getLeft @@ -174,7 +178,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } else { None } - HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit, + HoodieMergeOnReadFileSplit(baseDataPath, logPaths, queryInstant, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits @@ -203,8 +207,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, List.empty[HoodieMergeOnReadFileSplit] } else { val fileSplits = fileSlices.values.flatten.map(fileSlice => { - val latestCommit = metaClient.getActiveTimeline.getCommitsTimeline + val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline .filterCompletedInstants.lastInstant().get().getTimestamp + val queryInstant = specifiedQueryInstant.getOrElse(latestInstant) val partitionedFile = if (fileSlice.getBaseFile.isPresent) { val baseFile = fileSlice.getBaseFile.get() @@ -217,7 +222,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala .map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) - HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, latestCommit, metaClient.getBasePath, + HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index e2c622e51..da6c4824c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.hudi import scala.collection.JavaConverters._ import java.net.URI -import java.util.Locale +import java.util.{Date, Locale} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.{Column, DataFrame, SparkSession} @@ -37,9 +38,12 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} +import java.text.SimpleDateFormat import scala.collection.immutable.Map object HoodieSqlUtils extends SparkAdapterSupport { + private val defaultDateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd") def isHoodieTable(table: CatalogTable): Boolean = { table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" @@ -224,4 +228,26 @@ object HoodieSqlUtils extends SparkAdapterSupport { def isEnableHive(sparkSession: SparkSession): Boolean = "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) + + /** + * Convert different query instant time format to the commit time format. + * Currently we support three kinds of instant time format for time travel query: + * 1、yyyy-MM-dd HH:mm:ss + * 2、yyyy-MM-dd + * This will convert to 'yyyyMMdd000000'. + * 3、yyyyMMddHHmmss + */ + def formatQueryInstant(queryInstant: String): String = { + if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss + HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateTimeFormat.parse(queryInstant)) + } else if (queryInstant.length == 14) { // for yyyyMMddHHmmss + HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) // validate the format + queryInstant + } else if (queryInstant.length == 10) { // for yyyy-MM-dd + HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateFormat.parse(queryInstant)) + } else { + throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," + + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'") + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala new file mode 100644 index 000000000..96e9a17ac --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator} +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +import java.text.SimpleDateFormat + +class TestTimeTravelQuery extends HoodieClientTestBase { + var spark: SparkSession =_ + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQuery(tableType: HoodieTableType): Unit = { + initMetaClient(tableType) + val _spark = spark + import _spark.implicits._ + + // First write + val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") + df1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Overwrite) + .save(basePath) + + val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Second write + val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version") + df2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Third write + val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version") + df3.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Query as of firstCommitTime + val result1 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 10, 1000), result1) + + // Query as of secondCommitTime + val result2 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 12, 1001), result2) + + // Query as of thirdCommitTime + val result3 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 13, 1002), result3) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQueryForPartitionedTable(tableType: HoodieTableType): Unit = { + initMetaClient(tableType) + val _spark = spark + import _spark.implicits._ + + // First write + val df1 = Seq((1, "a1", 10, 1000, "2021-07-26")).toDF("id", "name", "value", "version", "dt") + df1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName) + .mode(SaveMode.Overwrite) + .save(basePath) + + val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Second write + val df2 = Seq((1, "a1", 12, 1001, "2021-07-26")).toDF("id", "name", "value", "version", "dt") + df2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName) + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Third write + val df3 = Seq((1, "a1", 13, 1002, "2021-07-26")).toDF("id", "name", "value", "version", "dt") + df3.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "version") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName) + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // query as of firstCommitTime (using 'yyyy-MM-dd HH:mm:ss' format) + val result1 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, defaultDateTimeFormat(firstCommit)) + .load(basePath) + .select("id", "name", "value", "version", "dt") + .take(1)(0) + assertEquals(Row(1, "a1", 10, 1000, "2021-07-26"), result1) + + // query as of secondCommitTime (using 'yyyyMMddHHmmss' format) + val result2 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit) + .load(basePath) + .select("id", "name", "value", "version", "dt") + .take(1)(0) + assertEquals(Row(1, "a1", 12, 1001, "2021-07-26"), result2) + + // query as of thirdCommitTime + val result3 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit) + .load(basePath) + .select("id", "name", "value", "version", "dt") + .take(1)(0) + assertEquals(Row(1, "a1", 13, 1002, "2021-07-26"), result3) + + // query by 'yyyy-MM-dd' format + val result4 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, defaultDateFormat(thirdCommit)) + .load(basePath) + .select("id", "name", "value", "version", "dt") + .collect() + // since there is no commit before the commit date, the query result should be empty. + assertTrue(result4.isEmpty) + } + + private def defaultDateTimeFormat(queryInstant: String): String = { + val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) + val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + format.format(date) + } + + private def defaultDateFormat(queryInstant: String): String = { + val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) + val format = new SimpleDateFormat("yyyy-MM-dd") + format.format(date) + } +}