[HUDI-4440] Treat boostrapped table as non-partitioned in HudiFileIndex if partition column is missing from schema (#6163)
Co-authored-by: Ryan Pifer <rmpifer@umich.edu>
This commit is contained in:
@@ -22,6 +22,7 @@ import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
|
|||||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||||
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
|
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
|
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
|
||||||
import org.apache.hudi.common.config.TypedProperties
|
import org.apache.hudi.common.config.TypedProperties
|
||||||
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
|
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
@@ -96,10 +97,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
|
|||||||
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
|
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
|
||||||
StructType(partitionFields)
|
StructType(partitionFields)
|
||||||
} else {
|
} else {
|
||||||
val partitionFields = partitionColumns.get().map(column =>
|
val partitionFields = partitionColumns.get().filter(column => nameFieldMap.contains(column))
|
||||||
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
|
.map(column => nameFieldMap.apply(column))
|
||||||
s"$column' in the schema[${schema.fields.mkString(",")}]")))
|
|
||||||
StructType(partitionFields)
|
if (partitionFields.size != partitionColumns.get().size) {
|
||||||
|
val isBootstrapTable = BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
|
||||||
|
if (isBootstrapTable) {
|
||||||
|
// For bootstrapped tables its possible the schema does not contain partition field when source table
|
||||||
|
// is hive style partitioned. In this case we would like to treat the table as non-partitioned
|
||||||
|
// as opposed to failing
|
||||||
|
new StructType()
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException(s"Cannot find columns: " +
|
||||||
|
s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " +
|
||||||
|
s"in the schema[${schema.fields.mkString(",")}]")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
new StructType(partitionFields)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If the partition columns have not stored in hoodie.properties(the table that was
|
// If the partition columns have not stored in hoodie.properties(the table that was
|
||||||
|
|||||||
@@ -109,9 +109,12 @@ class TestDataSourceForBootstrap {
|
|||||||
// check marked directory clean up
|
// check marked directory clean up
|
||||||
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
|
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
|
||||||
|
|
||||||
// Read bootstrapped table and verify count
|
// Read bootstrapped table and verify count using glob path
|
||||||
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||||
|
// Read bootstrapped table and verify count using Hudi file index
|
||||||
|
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
|
||||||
|
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||||
|
|
||||||
// Perform upsert
|
// Perform upsert
|
||||||
val updateTimestamp = Instant.now.toEpochMilli
|
val updateTimestamp = Instant.now.toEpochMilli
|
||||||
@@ -130,11 +133,11 @@ class TestDataSourceForBootstrap {
|
|||||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||||
|
|
||||||
// Read table after upsert and verify count
|
// Read table after upsert and verify count using glob path
|
||||||
hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
assertEquals(numRecords, hoodieROViewDF3.count())
|
||||||
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
|
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
|
||||||
// Read without *
|
// Read with base path using Hudi file index
|
||||||
val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
|
val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
|
||||||
assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
|
assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
|
||||||
assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
|
assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
|
||||||
@@ -169,6 +172,9 @@ class TestDataSourceForBootstrap {
|
|||||||
// Read bootstrapped table and verify count
|
// Read bootstrapped table and verify count
|
||||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||||
|
// Read bootstrapped table and verify count using Hudi file index
|
||||||
|
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
|
||||||
|
assertEquals(numRecords, hoodieROViewDF2.count())
|
||||||
|
|
||||||
// Perform upsert
|
// Perform upsert
|
||||||
val updateTimestamp = Instant.now.toEpochMilli
|
val updateTimestamp = Instant.now.toEpochMilli
|
||||||
@@ -189,10 +195,14 @@ class TestDataSourceForBootstrap {
|
|||||||
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||||
|
|
||||||
// Read table after upsert and verify count
|
// Read table after upsert and verify count using glob path
|
||||||
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF2.count())
|
assertEquals(numRecords, hoodieROViewDF3.count())
|
||||||
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
|
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
|
||||||
|
// Read table after upsert and verify count using Hudi file index
|
||||||
|
val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
|
||||||
|
assertEquals(numRecords, hoodieROViewDF4.count())
|
||||||
|
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
|
||||||
|
|
||||||
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
|
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
|
||||||
}
|
}
|
||||||
@@ -219,10 +229,10 @@ class TestDataSourceForBootstrap {
|
|||||||
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
|
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
|
||||||
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
|
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
|
||||||
|
|
||||||
// Read bootstrapped table and verify count
|
// Read bootstrapped table and verify count using glob path
|
||||||
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF1.count())
|
assertEquals(numRecords, hoodieROViewDF1.count())
|
||||||
// Read without *
|
// Read with base path using Hudi file index
|
||||||
val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
|
val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
|
||||||
assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())
|
assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())
|
||||||
|
|
||||||
@@ -260,10 +270,14 @@ class TestDataSourceForBootstrap {
|
|||||||
val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
|
||||||
|
|
||||||
// Read table after upsert and verify count
|
// Read table after upsert and verify count using glob paths
|
||||||
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
|
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
|
||||||
assertEquals(numRecords, hoodieROViewDF3.count())
|
assertEquals(numRecords, hoodieROViewDF3.count())
|
||||||
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
|
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
|
||||||
|
// Read table after upsert and verify count using Hudi file index
|
||||||
|
val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
|
||||||
|
assertEquals(numRecords, hoodieROViewDF4.count())
|
||||||
|
assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count())
|
||||||
|
|
||||||
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
|
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user