1
0

[HUDI-3204] fix problem that spark on TimestampKeyGenerator has no re… (#4714)

This commit is contained in:
Yann Byron
2022-02-15 12:38:38 +08:00
committed by GitHub
parent 27bd7b538e
commit cb6ca7f0d1
16 changed files with 337 additions and 73 deletions

View File

@@ -18,20 +18,30 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.getConfigProperties
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import java.text.SimpleDateFormat
/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
@@ -102,6 +112,10 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val convertedPartitionFilters =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)
// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
@@ -135,7 +149,7 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
var totalFileSize = 0
var candidateFileSize = 0
@@ -266,7 +280,7 @@ case class HoodieFileIndex(spark: SparkSession,
}
}
object HoodieFileIndex {
object HoodieFileIndex extends Logging {
def getConfigProperties(spark: SparkSession, options: Map[String, String]) = {
val sqlConf: SQLConf = spark.sessionState.conf
@@ -281,6 +295,41 @@ object HoodieFileIndex {
properties
}
def convertFilterForTimestampKeyGenerator(metaClient: HoodieTableMetaClient,
partitionFilters: Seq[Expression]): Seq[Expression] = {
val tableConfig = metaClient.getTableConfig
val keyGenerator = tableConfig.getKeyGeneratorClassName
if (keyGenerator != null && (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName))) {
val inputFormat = tableConfig.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)
val outputFormat = tableConfig.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)
if (StringUtils.isNullOrEmpty(inputFormat) || StringUtils.isNullOrEmpty(outputFormat) ||
inputFormat.equals(outputFormat)) {
partitionFilters
} else {
try {
val inDateFormat = new SimpleDateFormat(inputFormat)
val outDateFormat = new SimpleDateFormat(outputFormat)
partitionFilters.toArray.map {
_.transformDown {
case Literal(value, dataType) if dataType.isInstanceOf[StringType] =>
val converted = outDateFormat.format(inDateFormat.parse(value.toString))
Literal(UTF8String.fromString(converted), StringType)
}
}
} catch {
case NonFatal(e) =>
logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator.")
partitionFilters
}
}
} else {
partitionFilters
}
}
private def getQueryPath(options: Map[String, String]) = {
new Path(options.getOrElse("path", "'path' option required"))
}

View File

@@ -41,6 +41,7 @@ import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRo
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
@@ -91,6 +92,9 @@ object HoodieSparkSqlWriter {
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
@@ -153,7 +157,8 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
.setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters))
.setKeyGeneratorClassProp(originKeyGeneratorClassName)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
@@ -757,4 +762,14 @@ object HoodieSparkSqlWriter {
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}
private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String,
params: Map[String, String]): Map[String, String] = {
if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) {
params.filterKeys(HoodieTableConfig.PERSISTED_CONFIG_LIST.contains)
} else {
Map.empty
}
}
}

View File

@@ -187,10 +187,12 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p)))
val partitionFilterExpression =
HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema)
val convertedPartitionFilterExpression =
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq)
// If convert success to catalyst expression, use the partition prune
val fileSlices = if (partitionFilterExpression.isDefined) {
hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get))
val fileSlices = if (convertedPartitionFilterExpression.nonEmpty) {
hoodieFileIndex.listFileSlices(convertedPartitionFilterExpression)
} else {
hoodieFileIndex.listFileSlices(Seq.empty[Expression])
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.common.HoodieJavaEngineContext
@@ -33,20 +34,25 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType}
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.PartitionDirectory
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -25,13 +26,15 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieUpsertException
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
@@ -41,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -93,6 +97,62 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}
/**
* This tests the case that query by with a specified partition condition on hudi table which is
* different between the value of the partition field and the actual partition path,
* like hudi table written by TimestampBasedKeyGenerator.
*
* For COW table, test the snapshot query mode and incremental query mode.
*/
@Test
def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
val options = commonOpts ++ Map(
"hoodie.compact.inline" -> "false",
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
Config.TIMESTAMP_TIMEZONE_FORMAT_PROP -> "GMT+8:00",
Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd"
)
val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01"))
val records1 = recordsToStrings(dataGen1.generateInserts("001", 20)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp
val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02"))
val records2 = recordsToStrings(dataGen2.generateInserts("002", 30)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath)
val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
// snapshot query
val snapshotQueryRes = spark.read.format("hudi").load(basePath)
assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
// incremental query
val incrementalQueryRes = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit1Time)
.option(DataSourceReadOptions.END_INSTANTTIME.key, commit2Time)
.load(basePath)
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 30)
}
/**
* Test for https://issues.apache.org/jira/browse/HUDI-1615. Null Schema in BulkInsert row writer flow.
* This was reported by customer when archival kicks in as the schema in commit metadata is not set for bulk_insert

View File

@@ -26,12 +26,14 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
@@ -27,12 +28,16 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.BooleanType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -770,4 +775,79 @@ class TestMORDataSource extends HoodieClientTestBase {
.load(basePath + "/*/*/*/*")
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}
/**
* This tests the case that query by with a specified partition condition on hudi table which is
* different between the value of the partition field and the actual partition path,
* like hudi table written by TimestampBasedKeyGenerator.
*
* For MOR table, test all the three query modes.
*/
@Test
def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
val options = commonOpts ++ Map(
"hoodie.compact.inline" -> "false",
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
Config.TIMESTAMP_TIMEZONE_FORMAT_PROP -> "GMT+8:00",
Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd"
)
val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01"))
val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp
val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02"))
val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath)
val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).toList
val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.options(options)
.mode(SaveMode.Append)
.save(basePath)
val commit3Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
// snapshot query
val snapshotQueryRes = spark.read.format("hudi").load(basePath)
assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit1Time'").count, 50)
assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit2Time'").count, 40)
assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit3Time'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50)
assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60)
// read_optimized query
val readOptimizedQueryRes = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath)
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50)
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60)
// incremental query
val incrementalQueryRes = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time)
.option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time)
.load(basePath)
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20)
}
}