1
0
This commit is contained in:
liujinhui
2021-08-18 13:45:48 +08:00
committed by GitHub
parent 99663d370b
commit 5ee35a0a92
4 changed files with 55 additions and 4 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -47,7 +48,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Simple examples of #{@link SparkRDDWriteClient}.
*
@@ -127,6 +127,16 @@ public class HoodieWriteClientExample {
JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
// Delete by partition
newCommitTime = client.startCommit();
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
LOG.info("Starting commit " + newCommitTime);
// The partition where the data needs to be deleted
List<String> partitionList = toBeDeleted.stream().map(s -> s.getPartitionPath()).distinct().collect(Collectors.toList());
List<String> deleteList = recordsSoFar.stream().filter(f -> !partitionList.contains(f.getPartitionPath()))
.map(m -> m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
client.deletePartitions(deleteList, newCommitTime);
// compaction
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
Option<String> instant = client.scheduleCompaction(Option.empty());

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.examples.spark
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME, END_INSTANTTIME, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE}
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, PARTITIONS_TO_DELETE, OPERATION, DELETE_PARTITION_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
@@ -61,10 +61,12 @@ object HoodieDataSourceExample {
incrementalQuery(spark, tablePath, tableName)
pointInTimeQuery(spark, tablePath, tableName)
delete(spark, tablePath, tableName)
deleteByPartition(spark, tablePath, tableName)
spark.stop()
}
/**
* Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below.
*/
@@ -72,7 +74,6 @@ object HoodieDataSourceExample {
val commitTime: String = System.currentTimeMillis().toString
val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20))
spark.sparkContext.parallelize(inserts, 2)
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
@@ -129,6 +130,43 @@ object HoodieDataSourceExample {
save(tablePath)
}
/**
* Deleta data based in data information.
*/
def delete(spark: SparkSession, tablePath: String, tableName: String): Unit = {
val roViewDF = spark.read.format("org.apache.hudi").load(tablePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
val df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2")
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key, "ts").
option(RECORDKEY_FIELD.key, "uuid").
option(PARTITIONPATH_FIELD.key, "partitionpath").
option(TABLE_NAME.key, tableName).
option(OPERATION.key, DELETE_OPERATION_OPT_VAL).
mode(Append).
save(tablePath)
}
/**
* Delete the data of a single or multiple partitions.
*/
def deleteByPartition(spark: SparkSession, tablePath: String, tableName: String): Unit = {
val df = spark.emptyDataFrame
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key, "ts").
option(RECORDKEY_FIELD.key, "uuid").
option(PARTITIONPATH_FIELD.key, "partitionpath").
option(TABLE_NAME.key, tableName).
option(OPERATION.key, DELETE_PARTITION_OPERATION_OPT_VAL).
option(PARTITIONS_TO_DELETE.key(), HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS.mkString(",")).
mode(Append).
save(tablePath)
}
/**
* Hudi also provides capability to obtain a stream of records that changed since given commit timestamp.
* This can be achieved using Hudis incremental view and providing a begin time from which changes need to be streamed.