1
0

[HUDI-1520] add configure for spark sql overwrite use INSERT_OVERWRITE_TABLE (#2428)

This commit is contained in:
lw0090
2021-01-12 01:07:47 +08:00
committed by GitHub
parent 7ce3ac778e
commit de42adc230
3 changed files with 8 additions and 9 deletions

View File

@@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}
// If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
// Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
// the table. This will replace the old fs.delete(tablepath) mode.
if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
operation = WriteOperationType.INSERT_OVERWRITE_TABLE
}
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
// When user set operation as INSERT_OVERWRITE_TABLE,
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else {
// Delete Operation only supports Append mode

View File

@@ -202,7 +202,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -229,6 +229,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

View File

@@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase {
val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "true")
.mode(SaveMode.Append)
.save(basePath)
val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)