[HUDI-2243] Support Time Travel Query For Hoodie Table (#3360)
This commit is contained in:
@@ -116,6 +116,13 @@ object DataSourceReadOptions {
|
|||||||
.defaultValue("")
|
.defaultValue("")
|
||||||
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
|
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
|
||||||
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")
|
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")
|
||||||
|
|
||||||
|
val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
|
||||||
|
.key("as.of.instant")
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDocumentation("The query instant for time travel. Without specified this option," +
|
||||||
|
" we query the latest snapshot.")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -18,14 +18,13 @@
|
|||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
|
import org.apache.hudi.common.model.FileSlice
|
||||||
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
|
||||||
@@ -37,6 +36,7 @@ import org.apache.spark.sql.avro.SchemaConverters
|
|||||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
|
||||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
||||||
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
|
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
|
||||||
|
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.unsafe.types.UTF8String
|
import org.apache.spark.unsafe.types.UTF8String
|
||||||
@@ -81,6 +81,9 @@ case class HoodieFileIndex(
|
|||||||
|
|
||||||
private val tableType = metaClient.getTableType
|
private val tableType = metaClient.getTableType
|
||||||
|
|
||||||
|
private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
|
||||||
|
.map(HoodieSqlUtils.formatQueryInstant)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the schema of the table.
|
* Get the schema of the table.
|
||||||
*/
|
*/
|
||||||
@@ -214,15 +217,23 @@ case class HoodieFileIndex(
|
|||||||
|
|
||||||
metaClient.reloadActiveTimeline()
|
metaClient.reloadActiveTimeline()
|
||||||
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
|
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
|
||||||
|
val latestInstant = activeInstants.lastInstant()
|
||||||
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
|
fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
|
||||||
|
val queryInstant = if (specifiedQueryInstant.isDefined) {
|
||||||
|
specifiedQueryInstant
|
||||||
|
} else if (latestInstant.isPresent) {
|
||||||
|
Some(latestInstant.get.getTimestamp)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
(tableType, queryType) match {
|
(tableType, queryType) match {
|
||||||
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
|
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
|
||||||
// Fetch and store latest base and log files, and their sizes
|
// Fetch and store latest base and log files, and their sizes
|
||||||
cachedAllInputFileSlices = partitionFiles.map(p => {
|
cachedAllInputFileSlices = partitionFiles.map(p => {
|
||||||
val latestSlices = if (activeInstants.lastInstant().isPresent) {
|
val latestSlices = if (latestInstant.isPresent) {
|
||||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath,
|
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get)
|
||||||
activeInstants.lastInstant().get().getTimestamp).iterator().asScala.toSeq
|
.iterator().asScala.toSeq
|
||||||
} else {
|
} else {
|
||||||
Seq()
|
Seq()
|
||||||
}
|
}
|
||||||
@@ -238,7 +249,12 @@ case class HoodieFileIndex(
|
|||||||
case (_, _) =>
|
case (_, _) =>
|
||||||
// Fetch and store latest base files and its sizes
|
// Fetch and store latest base files and its sizes
|
||||||
cachedAllInputFileSlices = partitionFiles.map(p => {
|
cachedAllInputFileSlices = partitionFiles.map(p => {
|
||||||
(p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq)
|
val fileSlices = specifiedQueryInstant
|
||||||
|
.map(instant =>
|
||||||
|
fileSystemView.getLatestFileSlicesBeforeOrOn(p._1.partitionPath, instant, true))
|
||||||
|
.getOrElse(fileSystemView.getLatestFileSlices(p._1.partitionPath))
|
||||||
|
.iterator().asScala.toSeq
|
||||||
|
(p._1, fileSlices)
|
||||||
})
|
})
|
||||||
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum
|
cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum
|
||||||
}
|
}
|
||||||
@@ -246,7 +262,7 @@ case class HoodieFileIndex(
|
|||||||
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
|
// If the partition value contains InternalRow.empty, we query it as a non-partitioned table.
|
||||||
queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty)
|
queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty)
|
||||||
val flushSpend = System.currentTimeMillis() - startTime
|
val flushSpend = System.currentTimeMillis() - startTime
|
||||||
logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," +
|
logInfo(s"Refresh table ${metaClient.getTableConfig.getTableName}," +
|
||||||
s" spend: $flushSpend ms")
|
s" spend: $flushSpend ms")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters
|
|||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
|
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
|
||||||
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
|
||||||
|
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||||
import org.apache.spark.sql.{Row, SQLContext}
|
import org.apache.spark.sql.{Row, SQLContext}
|
||||||
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
@@ -97,6 +98,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
|
|
||||||
override def needConversion: Boolean = false
|
override def needConversion: Boolean = false
|
||||||
|
|
||||||
|
private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
|
||||||
|
.map(HoodieSqlUtils.formatQueryInstant)
|
||||||
|
|
||||||
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
|
||||||
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
|
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
|
||||||
log.debug(s" buildScan filters = ${filters.mkString(",")}")
|
log.debug(s" buildScan filters = ${filters.mkString(",")}")
|
||||||
@@ -159,7 +163,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
|
if (!lastInstant.isPresent) { // Return empty list if the table has no commit
|
||||||
List.empty
|
List.empty
|
||||||
} else {
|
} else {
|
||||||
val latestCommit = lastInstant.get().getTimestamp
|
val queryInstant = specifiedQueryInstant.getOrElse(lastInstant.get().getTimestamp)
|
||||||
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
|
val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala
|
||||||
val fileSplits = baseAndLogsList.map(kv => {
|
val fileSplits = baseAndLogsList.map(kv => {
|
||||||
val baseFile = kv.getLeft
|
val baseFile = kv.getLeft
|
||||||
@@ -174,7 +178,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit,
|
HoodieMergeOnReadFileSplit(baseDataPath, logPaths, queryInstant,
|
||||||
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
|
||||||
}).toList
|
}).toList
|
||||||
fileSplits
|
fileSplits
|
||||||
@@ -203,8 +207,9 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
List.empty[HoodieMergeOnReadFileSplit]
|
List.empty[HoodieMergeOnReadFileSplit]
|
||||||
} else {
|
} else {
|
||||||
val fileSplits = fileSlices.values.flatten.map(fileSlice => {
|
val fileSplits = fileSlices.values.flatten.map(fileSlice => {
|
||||||
val latestCommit = metaClient.getActiveTimeline.getCommitsTimeline
|
val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline
|
||||||
.filterCompletedInstants.lastInstant().get().getTimestamp
|
.filterCompletedInstants.lastInstant().get().getTimestamp
|
||||||
|
val queryInstant = specifiedQueryInstant.getOrElse(latestInstant)
|
||||||
|
|
||||||
val partitionedFile = if (fileSlice.getBaseFile.isPresent) {
|
val partitionedFile = if (fileSlice.getBaseFile.isPresent) {
|
||||||
val baseFile = fileSlice.getBaseFile.get()
|
val baseFile = fileSlice.getBaseFile.get()
|
||||||
@@ -217,7 +222,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala
|
val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala
|
||||||
.map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList
|
.map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList
|
||||||
val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths)
|
val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths)
|
||||||
HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, latestCommit, metaClient.getBasePath,
|
HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath,
|
||||||
maxCompactionMemoryInBytes, mergeType)
|
maxCompactionMemoryInBytes, mergeType)
|
||||||
}).toList
|
}).toList
|
||||||
fileSplits
|
fileSplits
|
||||||
|
|||||||
@@ -19,12 +19,13 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
import java.util.Locale
|
import java.util.{Date, Locale}
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.SparkAdapterSupport
|
import org.apache.hudi.SparkAdapterSupport
|
||||||
import org.apache.hudi.common.model.HoodieRecord
|
import org.apache.hudi.common.model.HoodieRecord
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
import org.apache.spark.SPARK_VERSION
|
import org.apache.spark.SPARK_VERSION
|
||||||
import org.apache.spark.sql.avro.SchemaConverters
|
import org.apache.spark.sql.avro.SchemaConverters
|
||||||
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
||||||
@@ -37,9 +38,12 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
|
|||||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat
|
||||||
import scala.collection.immutable.Map
|
import scala.collection.immutable.Map
|
||||||
|
|
||||||
object HoodieSqlUtils extends SparkAdapterSupport {
|
object HoodieSqlUtils extends SparkAdapterSupport {
|
||||||
|
private val defaultDateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||||
|
private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd")
|
||||||
|
|
||||||
def isHoodieTable(table: CatalogTable): Boolean = {
|
def isHoodieTable(table: CatalogTable): Boolean = {
|
||||||
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
|
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
|
||||||
@@ -224,4 +228,26 @@ object HoodieSqlUtils extends SparkAdapterSupport {
|
|||||||
|
|
||||||
def isEnableHive(sparkSession: SparkSession): Boolean =
|
def isEnableHive(sparkSession: SparkSession): Boolean =
|
||||||
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
|
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert different query instant time format to the commit time format.
|
||||||
|
* Currently we support three kinds of instant time format for time travel query:
|
||||||
|
* 1、yyyy-MM-dd HH:mm:ss
|
||||||
|
* 2、yyyy-MM-dd
|
||||||
|
* This will convert to 'yyyyMMdd000000'.
|
||||||
|
* 3、yyyyMMddHHmmss
|
||||||
|
*/
|
||||||
|
def formatQueryInstant(queryInstant: String): String = {
|
||||||
|
if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss
|
||||||
|
HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateTimeFormat.parse(queryInstant))
|
||||||
|
} else if (queryInstant.length == 14) { // for yyyyMMddHHmmss
|
||||||
|
HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) // validate the format
|
||||||
|
queryInstant
|
||||||
|
} else if (queryInstant.length == 10) { // for yyyy-MM-dd
|
||||||
|
HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateFormat.parse(queryInstant))
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant,"
|
||||||
|
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,230 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
|
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
|
||||||
|
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||||
|
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
|
||||||
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
|
import org.junit.jupiter.api.{AfterEach, BeforeEach}
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
|
import org.junit.jupiter.params.provider.EnumSource
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat
|
||||||
|
|
||||||
|
class TestTimeTravelQuery extends HoodieClientTestBase {
|
||||||
|
var spark: SparkSession =_
|
||||||
|
val commonOpts = Map(
|
||||||
|
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||||
|
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||||
|
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
|
||||||
|
"hoodie.delete.shuffle.parallelism" -> "1",
|
||||||
|
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||||
|
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||||
|
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
|
||||||
|
HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test"
|
||||||
|
)
|
||||||
|
|
||||||
|
@BeforeEach override def setUp() {
|
||||||
|
initPath()
|
||||||
|
initSparkContexts()
|
||||||
|
spark = sqlContext.sparkSession
|
||||||
|
initTestDataGenerator()
|
||||||
|
initFileSystem()
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach override def tearDown() = {
|
||||||
|
cleanupSparkContexts()
|
||||||
|
cleanupTestDataGenerator()
|
||||||
|
cleanupFileSystem()
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = classOf[HoodieTableType])
|
||||||
|
def testTimeTravelQuery(tableType: HoodieTableType): Unit = {
|
||||||
|
initMetaClient(tableType)
|
||||||
|
val _spark = spark
|
||||||
|
import _spark.implicits._
|
||||||
|
|
||||||
|
// First write
|
||||||
|
val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
|
||||||
|
df1.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// Second write
|
||||||
|
val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version")
|
||||||
|
df2.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
metaClient.reloadActiveTimeline()
|
||||||
|
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// Third write
|
||||||
|
val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
|
||||||
|
df3.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[NonpartitionedKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
metaClient.reloadActiveTimeline()
|
||||||
|
val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// Query as of firstCommitTime
|
||||||
|
val result1 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 10, 1000), result1)
|
||||||
|
|
||||||
|
// Query as of secondCommitTime
|
||||||
|
val result2 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit)
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 12, 1001), result2)
|
||||||
|
|
||||||
|
// Query as of thirdCommitTime
|
||||||
|
val result3 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 13, 1002), result3)
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = classOf[HoodieTableType])
|
||||||
|
def testTimeTravelQueryForPartitionedTable(tableType: HoodieTableType): Unit = {
|
||||||
|
initMetaClient(tableType)
|
||||||
|
val _spark = spark
|
||||||
|
import _spark.implicits._
|
||||||
|
|
||||||
|
// First write
|
||||||
|
val df1 = Seq((1, "a1", 10, 1000, "2021-07-26")).toDF("id", "name", "value", "version", "dt")
|
||||||
|
df1.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "dt")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// Second write
|
||||||
|
val df2 = Seq((1, "a1", 12, 1001, "2021-07-26")).toDF("id", "name", "value", "version", "dt")
|
||||||
|
df2.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "dt")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
metaClient.reloadActiveTimeline()
|
||||||
|
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// Third write
|
||||||
|
val df3 = Seq((1, "a1", 13, 1002, "2021-07-26")).toDF("id", "name", "value", "version", "dt")
|
||||||
|
df3.write.format("hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "version")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "dt")
|
||||||
|
.option(KEYGENERATOR_CLASS.key, classOf[ComplexKeyGenerator].getName)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
metaClient.reloadActiveTimeline()
|
||||||
|
val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
|
|
||||||
|
// query as of firstCommitTime (using 'yyyy-MM-dd HH:mm:ss' format)
|
||||||
|
val result1 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, defaultDateTimeFormat(firstCommit))
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version", "dt")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 10, 1000, "2021-07-26"), result1)
|
||||||
|
|
||||||
|
// query as of secondCommitTime (using 'yyyyMMddHHmmss' format)
|
||||||
|
val result2 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit)
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version", "dt")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 12, 1001, "2021-07-26"), result2)
|
||||||
|
|
||||||
|
// query as of thirdCommitTime
|
||||||
|
val result3 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version", "dt")
|
||||||
|
.take(1)(0)
|
||||||
|
assertEquals(Row(1, "a1", 13, 1002, "2021-07-26"), result3)
|
||||||
|
|
||||||
|
// query by 'yyyy-MM-dd' format
|
||||||
|
val result4 = spark.read.format("hudi")
|
||||||
|
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, defaultDateFormat(thirdCommit))
|
||||||
|
.load(basePath)
|
||||||
|
.select("id", "name", "value", "version", "dt")
|
||||||
|
.collect()
|
||||||
|
// since there is no commit before the commit date, the query result should be empty.
|
||||||
|
assertTrue(result4.isEmpty)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def defaultDateTimeFormat(queryInstant: String): String = {
|
||||||
|
val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant)
|
||||||
|
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
|
||||||
|
format.format(date)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def defaultDateFormat(queryInstant: String): String = {
|
||||||
|
val date = HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant)
|
||||||
|
val format = new SimpleDateFormat("yyyy-MM-dd")
|
||||||
|
format.format(date)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user