[HUDI-1376] Drop Hudi metadata cols at the beginning of Spark datasource writing (#2233)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -19,7 +19,7 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.DataSourceReadOptions._
|
import org.apache.hudi.DataSourceReadOptions._
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
|
||||||
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
|
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||||
@@ -33,6 +33,8 @@ import org.apache.spark.sql.streaming.OutputMode
|
|||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
|
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hoodie Spark Datasource, for reading and writing hoodie tables
|
* Hoodie Spark Datasource, for reading and writing hoodie tables
|
||||||
*
|
*
|
||||||
@@ -119,12 +121,14 @@ class DefaultSource extends RelationProvider
|
|||||||
optParams: Map[String, String],
|
optParams: Map[String, String],
|
||||||
df: DataFrame): BaseRelation = {
|
df: DataFrame): BaseRelation = {
|
||||||
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
|
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
|
||||||
|
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
|
||||||
|
|
||||||
if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
|
if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
|
||||||
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
|
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, dfWithoutMetaCols)
|
||||||
} else {
|
} else {
|
||||||
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
|
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, dfWithoutMetaCols)
|
||||||
}
|
}
|
||||||
new HoodieEmptyRelation(sqlContext, df.schema)
|
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def createSink(sqlContext: SQLContext,
|
override def createSink(sqlContext: SQLContext,
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig
|
|||||||
import org.apache.hudi.testutils.HoodieClientTestBase
|
import org.apache.hudi.testutils.HoodieClientTestBase
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions.col
|
import org.apache.spark.sql.functions.{col, lit}
|
||||||
import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
|
import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType}
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||||
@@ -52,6 +52,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
val verificationCol: String = "driver"
|
||||||
|
val updatedVerificationVal: String = "driver_update"
|
||||||
|
|
||||||
@BeforeEach override def setUp() {
|
@BeforeEach override def setUp() {
|
||||||
initPath()
|
initPath()
|
||||||
initSparkContexts()
|
initSparkContexts()
|
||||||
@@ -96,23 +99,36 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
|
val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
|
||||||
assertEquals(100, snapshotDF1.count())
|
assertEquals(100, snapshotDF1.count())
|
||||||
|
|
||||||
|
// Upsert based on the written table with Hudi metadata columns
|
||||||
|
val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0)
|
||||||
|
val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
||||||
|
|
||||||
|
updateDf.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(100, snapshotDF2.count())
|
||||||
|
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
|
||||||
|
|
||||||
|
// Upsert Operation without Hudi metadata columns
|
||||||
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
|
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
|
||||||
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||||
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
||||||
|
|
||||||
// Upsert Operation
|
|
||||||
inputDF2.write.format("org.apache.hudi")
|
inputDF2.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(commonOpts)
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
|
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
|
||||||
|
|
||||||
// Snapshot Query
|
// Snapshot Query
|
||||||
val snapshotDF2 = spark.read.format("org.apache.hudi")
|
val snapshotDF3 = spark.read.format("org.apache.hudi")
|
||||||
.load(basePath + "/*/*/*/*")
|
.load(basePath + "/*/*/*/*")
|
||||||
assertEquals(100, snapshotDF2.count()) // still 100, since we only updated
|
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
|
||||||
|
|
||||||
// Read Incremental Query
|
// Read Incremental Query
|
||||||
// we have 2 commits, try pulling the first commit (which is not the latest)
|
// we have 2 commits, try pulling the first commit (which is not the latest)
|
||||||
|
|||||||
@@ -47,6 +47,9 @@ class TestMORDataSource extends HoodieClientTestBase {
|
|||||||
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
val verificationCol: String = "driver"
|
||||||
|
val updatedVerificationVal: String = "driver_update"
|
||||||
|
|
||||||
@BeforeEach override def setUp() {
|
@BeforeEach override def setUp() {
|
||||||
initPath()
|
initPath()
|
||||||
initSparkContexts()
|
initSparkContexts()
|
||||||
@@ -86,7 +89,7 @@ class TestMORDataSource extends HoodieClientTestBase {
|
|||||||
val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
|
val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
|
||||||
assertEquals(List(insertCommitTime), insertCommitTimes)
|
assertEquals(List(insertCommitTime), insertCommitTimes)
|
||||||
|
|
||||||
// Upsert operation
|
// Upsert operation without Hudi metadata columns
|
||||||
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
||||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||||
inputDF2.write.format("org.apache.hudi")
|
inputDF2.write.format("org.apache.hudi")
|
||||||
@@ -101,6 +104,19 @@ class TestMORDataSource extends HoodieClientTestBase {
|
|||||||
.load(basePath + "/*/*/*/*")
|
.load(basePath + "/*/*/*/*")
|
||||||
val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
|
val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
|
||||||
assertEquals(List(updateCommitTime), updateCommitTimes)
|
assertEquals(List(updateCommitTime), updateCommitTimes)
|
||||||
|
|
||||||
|
// Upsert based on the written table with Hudi metadata columns
|
||||||
|
val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0)
|
||||||
|
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
||||||
|
|
||||||
|
inputDF3.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
|
val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
|
||||||
|
assertEquals(100, hudiSnapshotDF3.count())
|
||||||
|
assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testCount() {
|
@Test def testCount() {
|
||||||
|
|||||||
Reference in New Issue
Block a user