From 5ee35a0a92ab0de6c36885a515fad04c08109c78 Mon Sep 17 00:00:00 2001 From: liujinhui <965147871@qq.com> Date: Wed, 18 Aug 2021 13:45:48 +0800 Subject: [PATCH] HUDI-1674 (#3488) --- .../spark/HoodieWriteClientExample.java | 12 ++++- .../spark/HoodieDataSourceExample.scala | 44 +++++++++++++++++-- .../org/apache/hudi/DataSourceOptions.scala | 1 + .../java/org/apache/hudi/QuickstartUtils.java | 2 + 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index feddd1b9f..35e46605f 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -47,7 +48,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; - /** * Simple examples of #{@link SparkRDDWriteClient}. * @@ -127,6 +127,16 @@ public class HoodieWriteClientExample { JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1); client.delete(deleteRecords, newCommitTime); + // Delete by partition + newCommitTime = client.startCommit(); + client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + LOG.info("Starting commit " + newCommitTime); + // The partition where the data needs to be deleted + List partitionList = toBeDeleted.stream().map(s -> s.getPartitionPath()).distinct().collect(Collectors.toList()); + List deleteList = recordsSoFar.stream().filter(f -> !partitionList.contains(f.getPartitionPath())) + .map(m -> m.getKey().getPartitionPath()).distinct().collect(Collectors.toList()); + client.deletePartitions(deleteList, newCommitTime); + // compaction if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { Option instant = client.scheduleCompaction(Option.empty()); diff --git a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala index 3072f6f0a..ada5aeaa1 100644 --- a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala +++ b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala @@ -19,7 +19,7 @@ package org.apache.hudi.examples.spark import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE} -import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION, DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL} import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs import org.apache.hudi.common.model.HoodieAvroPayload import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME @@ -61,10 +61,12 @@ object HoodieDataSourceExample { incrementalQuery(spark, tablePath, tableName) pointInTimeQuery(spark, tablePath, tableName) + delete(spark, tablePath, tableName) + deleteByPartition(spark, tablePath, tableName) + spark.stop() } - /** * Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below. */ @@ -72,7 +74,6 @@ object HoodieDataSourceExample { val commitTime: String = System.currentTimeMillis().toString val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)) - spark.sparkContext.parallelize(inserts, 2) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). @@ -129,6 +130,43 @@ object HoodieDataSourceExample { save(tablePath) } + /** + * Deleta data based in data information. + */ + def delete(spark: SparkSession, tablePath: String, tableName: String): Unit = { + + val roViewDF = spark.read.format("org.apache.hudi").load(tablePath + "/*/*/*/*") + roViewDF.createOrReplaceTempView("hudi_ro_table") + val df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2") + + df.write.format("org.apache.hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD.key, "ts"). + option(RECORDKEY_FIELD.key, "uuid"). + option(PARTITIONPATH_FIELD.key, "partitionpath"). + option(TABLE_NAME.key, tableName). + option(OPERATION.key, DELETE_OPERATION_OPT_VAL). + mode(Append). + save(tablePath) + } + + /** + * Delete the data of a single or multiple partitions. + */ + def deleteByPartition(spark: SparkSession, tablePath: String, tableName: String): Unit = { + val df = spark.emptyDataFrame + df.write.format("org.apache.hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD.key, "ts"). + option(RECORDKEY_FIELD.key, "uuid"). + option(PARTITIONPATH_FIELD.key, "partitionpath"). + option(TABLE_NAME.key, tableName). + option(OPERATION.key, DELETE_PARTITION_OPERATION_OPT_VAL). + option(PARTITIONS_TO_DELETE.key(), HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS.mkString(",")). + mode(Append). + save(tablePath) + } + /** * Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. * This can be achieved using Hudi’s incremental view and providing a begin time from which changes need to be streamed. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 168e79584..5044ab642 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -171,6 +171,7 @@ object DataSourceWriteOptions { val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value + val DELETE_PARTITION_OPERATION_OPT_VAL = WriteOperationType.DELETE_PARTITION.value val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE_TABLE.value diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java index 47b30173c..e0929efed 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java @@ -241,6 +241,8 @@ public class QuickstartUtils { Map demoConfigs = new HashMap<>(); demoConfigs.put("hoodie.insert.shuffle.parallelism", "2"); demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2"); + demoConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2"); + demoConfigs.put("hoodie.delete.shuffle.parallelism", "2"); return demoConfigs; } } \ No newline at end of file