1
0

[HUDI-1489] Fix null pointer exception when reading updated written bootstrap table (#2370)

Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
wenningd
2020-12-23 11:26:24 -08:00
committed by GitHub
parent 38b9264dd0
commit 89f482eaf2
2 changed files with 157 additions and 278 deletions

View File

@@ -91,6 +91,9 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
// Get required schemas for column pruning
var requiredDataSchema = StructType(Seq())
var requiredSkeletonSchema = StructType(Seq())
// requiredColsSchema is the schema of requiredColumns, note that requiredColumns is in a random order
// so requiredColsSchema is not always equal to (requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
var requiredColsSchema = StructType(Seq())
requiredColumns.foreach(col => {
var field = dataSchema.find(_.name == col)
if (field.isDefined) {
@@ -99,6 +102,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
field = skeletonSchema.find(_.name == col)
requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
}
requiredColsSchema = requiredColsSchema.add(field.get)
})
// Prepare readers for reading data file and skeleton files
@@ -129,7 +133,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
sparkSession = _sqlContext.sparkSession,
dataSchema = fullSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields),
requiredSchema = requiredColsSchema,
filters = filters,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())

View File

@@ -17,9 +17,6 @@
package org.apache.hudi.functional
import java.time.Instant
import java.util.Collections
import collection.JavaConverters._
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
@@ -37,10 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.io.TempDir
import java.time.Instant
import java.util.Collections
class TestDataSourceForBootstrap {
var spark: SparkSession = _
val commonOpts = Map(
val commonOpts: Map[String, String] = Map(
HoodieWriteConfig.INSERT_PARALLELISM -> "4",
HoodieWriteConfig.UPSERT_PARALLELISM -> "4",
HoodieWriteConfig.DELETE_PARALLELISM -> "4",
@@ -56,6 +56,14 @@ class TestDataSourceForBootstrap {
var srcPath: String = _
var fs: FileSystem = _
val partitionPaths: List[String] = List("2020-04-01", "2020-04-02", "2020-04-03")
val numRecords: Int = 100
val numRecordsUpdate: Int = 10
val verificationRowKey: String = "trip_0"
val verificationCol: String = "driver"
val originalVerificationVal: String = "driver_0"
val updatedVerificationVal: String = "driver_update"
@BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
spark = SparkSession.builder
.appName("Hoodie Datasource test")
@@ -83,7 +91,6 @@ class TestDataSourceForBootstrap {
@Test def testMetadataBootstrapCOWNonPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, Collections.emptyList(), jsc,
@@ -96,20 +103,7 @@ class TestDataSourceForBootstrap {
.save(srcPath)
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -117,18 +111,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
Collections.emptyList(), jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.mode(SaveMode.Append)
.save(basePath)
@@ -141,36 +131,11 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF1.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
// during bootstrap
val hoodieIncViewDF1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
// latest commit
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath);
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false)
}
@Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
@@ -184,20 +149,7 @@ class TestDataSourceForBootstrap {
.save(srcPath)
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -205,18 +157,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
// Required because source data is hive style partitioned
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
@@ -231,149 +179,11 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
// during bootstrap
val hoodieIncViewDF1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
// latest commit
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath);
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/datestr=2020-04-02/*")
.load(basePath)
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
hoodieIncViewDF3.count())
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}
@Test def testMetadataBootstrapCOWPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
var sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
// have partitioned columns stored in the data file
partitionPaths.foreach(partitionPath => {
sourceDF
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save(srcPath + "/" + partitionPath)
})
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
var updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option("hoodie.upsert.shuffle.parallelism", "4")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
// Read table after upsert and verify count
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
// during bootstrap
val hoodieIncViewDF1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
// latest commit
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.load(basePath);
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
.load(basePath)
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
hoodieIncViewDF3.count())
}
@Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
@@ -391,21 +201,74 @@ class TestDataSourceForBootstrap {
})
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Perform upsert based on the written bootstrap table
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
updateDf1.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
.mode(SaveMode.Overwrite)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
// Read table after upsert and verify the updated value
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
hoodieROViewDF2.collect()
assertEquals(updatedVerificationVal, hoodieROViewDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
// Perform upsert based on the source data
val updateTimestamp = Instant.now.toEpochMilli
val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF2.write
.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
// Read table after upsert and verify count
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF3.count())
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
}
@Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
spark.sqlContext)
// Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence
// have partitioned columns stored in the data file
partitionPaths.foreach(partitionPath => {
sourceDF
.filter(sourceDF("datestr").equalTo(lit(partitionPath)))
.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save(srcPath + "/" + partitionPath)
})
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -416,18 +279,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
@@ -449,8 +308,6 @@ class TestDataSourceForBootstrap {
@Test def testMetadataBootstrapMORPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
@@ -468,21 +325,7 @@ class TestDataSourceForBootstrap {
})
// Perform bootstrap
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, "org.apache.hudi.keygen.SimpleKeyGenerator")
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
@@ -491,41 +334,55 @@ class TestDataSourceForBootstrap {
.load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
partitionPaths.asJava, jsc, spark.sqlContext)
updateDF.write
// Perform upsert based on the written bootstrap table
val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
updateDf1.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
// Expect 1 new commit since meta bootstrap - delta commit (because inline compaction is off)
// Read table after upsert and verify the value
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
// Read table after upsert and verify count. Since we have inline compaction off the RO view will have
// no updated rows.
val hoodieROViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(0, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
hoodieROViewDF2.collect()
assertEquals(originalVerificationVal, hoodieROViewDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
// Perform upsert based on the source data
val updateTimestamp = Instant.now.toEpochMilli
val updateDF2 = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate,
partitionPaths.asJava, jsc, spark.sqlContext)
updateDF2.write
.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
// Expect 2 new commit since meta bootstrap - 2 delta commits (because inline compaction is off)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())
// Read table after upsert and verify count. Since we have inline compaction off the RO view will have
// no updated rows.
val hoodieROViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF3.count())
assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
}
@Test def testFullBootstrapCOWPartitioned(): Unit = {
val timestamp = Instant.now.toEpochMilli
val numRecords = 100
val partitionPaths = List("2020-04-01", "2020-04-02", "2020-04-03")
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc,
@@ -547,11 +404,8 @@ class TestDataSourceForBootstrap {
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR, classOf[FullRecordBootstrapModeSelector].getName)
@@ -568,18 +422,14 @@ class TestDataSourceForBootstrap {
// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
val numRecordsUpdate = 10
val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava,
jsc, spark.sqlContext)
updateDF.write
.format("hudi")
.options(commonOpts)
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr")
.mode(SaveMode.Append)
.save(basePath)
@@ -592,39 +442,64 @@ class TestDataSourceForBootstrap {
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false)
}
def runMetadataBootstrapAndVerifyCommit(tableType: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Overwrite)
.save(basePath)
val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, commitInstantTime1)
commitInstantTime1
}
def verifyIncrementalViewResult(bootstrapCommitInstantTime: String, latestCommitInstantTime: String,
isPartitioned: Boolean, isHiveStylePartitioned: Boolean): Unit = {
// incrementally pull only changes in the bootstrap commit, which would pull all the initial records written
// during bootstrap
val hoodieIncViewDF1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commitInstantTime1)
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime)
.load(basePath)
assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime1, countsPerCommit(0).get(0))
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
// incrementally pull only changes in the latest commit, which would pull only the updated records in the
// latest commit
// incrementally pull only changes after bootstrap commit, which would pull only the updated records in the
// later commits
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime)
.load(basePath);
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
assertEquals(1, countsPerCommit.length)
assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))
// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2020-04-02/*")
.load(basePath)
if (isPartitioned) {
val relativePartitionPath = if (isHiveStylePartitioned) "/datestr=2020-04-02/*" else "/2020-04-02/*"
// pull the update commits within certain partitions
val hoodieIncViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, bootstrapCommitInstantTime)
.option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, relativePartitionPath)
.load(basePath)
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
hoodieIncViewDF3.count())
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2020-04-02")).count(),
hoodieIncViewDF3.count())
}
}
}