[HUDI-4161] Make sure partition values are taken from partition path (#5699)
This commit is contained in:
@@ -54,6 +54,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
|
|||||||
|
|
||||||
override type FileSplit = HoodieBaseFileSplit
|
override type FileSplit = HoodieBaseFileSplit
|
||||||
|
|
||||||
|
// TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
|
||||||
|
// partition values from partition path
|
||||||
|
// For more details please check HUDI-4161
|
||||||
|
// NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]],
|
||||||
|
// which is currently done for all cases, except when Schema Evolution is enabled
|
||||||
|
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
|
||||||
|
val enableSchemaOnRead = !internalSchema.isEmptySchema
|
||||||
|
!enableSchemaOnRead
|
||||||
|
}
|
||||||
|
|
||||||
override lazy val mandatoryFields: Seq[String] =
|
override lazy val mandatoryFields: Seq[String] =
|
||||||
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
|
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
|
||||||
Seq(recordKeyField)
|
Seq(recordKeyField)
|
||||||
|
|||||||
@@ -171,7 +171,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
|||||||
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
|
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
|
||||||
// Controls whether partition columns (which are the source for the partition path values) should
|
// Controls whether partition columns (which are the source for the partition path values) should
|
||||||
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
|
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
|
||||||
// of partition columns) will be read from the data file ot extracted from partition path
|
// of partition columns) will be read from the data file or extracted from partition path
|
||||||
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
|
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
|
||||||
val shouldExtractPartitionValueFromPath =
|
val shouldExtractPartitionValueFromPath =
|
||||||
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
|
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
|
||||||
@@ -419,7 +419,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
|||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) =>
|
case NonFatal(e) =>
|
||||||
logWarning(s"Failed to get the right partition InternalRow for file : ${file.toString}")
|
logWarning(s"Failed to get the right partition InternalRow for file: ${file.toString}", e)
|
||||||
InternalRow.empty
|
InternalRow.empty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,9 +108,6 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
* @return list of PartitionDirectory containing partition to base files mapping
|
* @return list of PartitionDirectory containing partition to base files mapping
|
||||||
*/
|
*/
|
||||||
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
|
||||||
val convertedPartitionFilters =
|
|
||||||
HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)
|
|
||||||
|
|
||||||
// Look up candidate files names in the col-stats index, if all of the following conditions are true
|
// Look up candidate files names in the col-stats index, if all of the following conditions are true
|
||||||
// - Data-skipping is enabled
|
// - Data-skipping is enabled
|
||||||
// - Col-Stats Index is present
|
// - Col-Stats Index is present
|
||||||
@@ -144,7 +141,7 @@ case class HoodieFileIndex(spark: SparkSession,
|
|||||||
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
|
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
|
||||||
} else {
|
} else {
|
||||||
// Prune the partition path by the partition filters
|
// Prune the partition path by the partition filters
|
||||||
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
|
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
|
||||||
var totalFileSize = 0
|
var totalFileSize = 0
|
||||||
var candidateFileSize = 0
|
var candidateFileSize = 0
|
||||||
|
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
|
def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
|
||||||
val options = commonOpts ++ Map(
|
val options = commonOpts ++ Map(
|
||||||
"hoodie.compact.inline" -> "false",
|
"hoodie.compact.inline" -> "false",
|
||||||
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
|
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
|
||||||
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
|
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
|
||||||
Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
|
Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
|
||||||
Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
|
Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
|
||||||
@@ -176,8 +176,11 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
|
|
||||||
// snapshot query
|
// snapshot query
|
||||||
val snapshotQueryRes = spark.read.format("hudi").load(basePath)
|
val snapshotQueryRes = spark.read.format("hudi").load(basePath)
|
||||||
assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
|
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
|
||||||
assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
|
//assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
|
||||||
|
//assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
|
||||||
|
assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20)
|
||||||
|
assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30)
|
||||||
|
|
||||||
// incremental query
|
// incremental query
|
||||||
val incrementalQueryRes = spark.read.format("hudi")
|
val incrementalQueryRes = spark.read.format("hudi")
|
||||||
@@ -961,10 +964,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assert(firstDF.count() == 2)
|
assert(firstDF.count() == 2)
|
||||||
|
|
||||||
// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
|
// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
|
||||||
assertEquals(
|
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
|
||||||
Seq("2018-09-23", "2018-09-24"),
|
val expectedValues = if (useGlobbing) {
|
||||||
firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
|
Seq("2018-09-23", "2018-09-24")
|
||||||
)
|
} else {
|
||||||
|
Seq("2018/09/23", "2018/09/24")
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(expectedValues, firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq)
|
||||||
assertEquals(
|
assertEquals(
|
||||||
Seq("2018/09/23", "2018/09/24"),
|
Seq("2018/09/23", "2018/09/24"),
|
||||||
firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
|
firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
|
||||||
|
|||||||
@@ -17,11 +17,10 @@
|
|||||||
|
|
||||||
package org.apache.hudi.functional
|
package org.apache.hudi.functional
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType}
|
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||||
@@ -30,9 +29,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType
|
|||||||
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
|
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
||||||
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
|
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.apache.spark.sql.types.BooleanType
|
import org.apache.spark.sql.types.BooleanType
|
||||||
@@ -41,7 +39,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
|||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.CsvSource
|
import org.junit.jupiter.params.provider.CsvSource
|
||||||
|
|
||||||
import java.util
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
@@ -864,8 +861,11 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
|
|||||||
val readOptimizedQueryRes = spark.read.format("hudi")
|
val readOptimizedQueryRes = spark.read.format("hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
|
||||||
.load(basePath)
|
.load(basePath)
|
||||||
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50)
|
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
|
||||||
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60)
|
//assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50)
|
||||||
|
//assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60)
|
||||||
|
assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50)
|
||||||
|
assertEquals(readOptimizedQueryRes.where("partition = '2022/01/02'").count, 60)
|
||||||
|
|
||||||
// incremental query
|
// incremental query
|
||||||
val incrementalQueryRes = spark.read.format("hudi")
|
val incrementalQueryRes = spark.read.format("hudi")
|
||||||
|
|||||||
Reference in New Issue
Block a user