1
0

[HUDI-2307] When using delete_partition with ds should not rely on the primary key (#3469)

- Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
liujinhui
2021-08-14 14:53:39 +08:00
committed by GitHub
parent 8eed440694
commit b7da6cb33d
4 changed files with 72 additions and 55 deletions

View File

@@ -325,6 +325,11 @@ object DataSourceWriteOptions {
@Deprecated @Deprecated
val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key() val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.partitions.to.delete")
.noDefaultValue()
.withDocumentation("Comma separated list of partitions to delete")
val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.count") .key("hoodie.datasource.write.streaming.retry.count")
.defaultValue("3") .defaultValue("3")

View File

@@ -29,8 +29,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
@@ -192,7 +192,12 @@ object HoodieSparkSqlWriter {
} }
// Get list of partitions to delete // Get list of partitions to delete
val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",")
java.util.Arrays.asList(partitionColsToDelete:_*)
} else {
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
}
// Create a HoodieWriteClient & issue the delete. // Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path.get, tblName, null, path.get, tblName,

View File

@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -679,61 +679,68 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
} }
} }
test("test delete partitions") { List(true, false)
initSparkContext("test_delete_partitions") .foreach(usePartitionsToDeleteConfig => {
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions") test("test delete partitions for " + usePartitionsToDeleteConfig) {
try { initSparkContext("test_delete_partitions_" + usePartitionsToDeleteConfig)
val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions" val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) try {
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
val schema = DataSourceTestUtils.getStructTypeExampleSchema val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) var fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
val records = DataSourceTestUtils.generateRandomRows(10) val schema = DataSourceTestUtils.getStructTypeExampleSchema
val recordsSeq = convertRowListToSeq(records) val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) val records = DataSourceTestUtils.generateRandomRows(10)
// write to Hudi val recordsSeq = convertRowListToSeq(records)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi") val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF1.count()) assertEquals(10, snapshotDF1.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf1 = dropMetaFields(snapshotDF1) val trimmedDf1 = dropMetaFields(snapshotDF1)
assert(df1.except(trimmedDf1).count() == 0) assert(df1.except(trimmedDf1).count() == 0)
// issue updates so that log files are created for MOR table // issue updates so that log files are created for MOR table
var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi // write updates to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi") val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF2.count()) assertEquals(10, snapshotDF2.count())
// remove metadata columns so that expected and actual DFs can be compared as is // remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf2 = dropMetaFields(snapshotDF2) val trimmedDf2 = dropMetaFields(snapshotDF2)
// ensure 2nd batch of updates matches. // ensure 2nd batch of updates matches.
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
// delete partitions if ( usePartitionsToDeleteConfig) {
val recordsToDelete = df1.filter(entry => { fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
val partitionPath : String = entry.getString(1) }
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) // delete partitions contains the primary key
}) val recordsToDelete = df1.filter(entry => {
val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) val partitionPath : String = entry.getString(1)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
})
val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
val snapshotDF3 = spark.read.format("org.apache.hudi") val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*") .load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(0, snapshotDF3.filter(entry => { assertEquals(0, snapshotDF3.filter(entry => {
val partitionPath = entry.getString(3) val partitionPath = entry.getString(3)
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
}).count()) }).count())
} finally { } finally {
spark.stop() spark.stop()
FileUtils.deleteDirectory(path.toFile) FileUtils.deleteDirectory(path.toFile)
} }
} }
})
def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = { def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = {
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))

View File

@@ -27,7 +27,7 @@
<check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/> <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
<check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true">
<parameters> <parameters>
<parameter name="maxFileLength"><![CDATA[800]]></parameter> <parameter name="maxFileLength"><![CDATA[900]]></parameter>
</parameters> </parameters>
</check> </check>
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/> <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>