From 14dbbdf4c7a45ab5a10889f8e558984455315829 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 22 Feb 2022 00:01:30 -0500 Subject: [PATCH] [HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787) --- .../utilities/deltastreamer/DeltaSync.java | 4 ++ .../HoodieDeltaStreamerTestBase.java | 2 +- .../functional/TestHoodieDeltaStreamer.java | 57 ++++++++++++++++++- 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c376243ba..082a9b1d5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -585,6 +585,10 @@ public class DeltaSync implements Serializable { case INSERT_OVERWRITE_TABLE: writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses(); break; + case DELETE_PARTITION: + List partitions = records.map(record -> record.getPartitionPath()).distinct().collect(); + writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses(); + break; default: throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index 9b7ee3b30..02b1848e2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -173,7 +173,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { props.setProperty("include", "sql-transformer.properties"); props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path"); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index a6fdf00d1..1c8089658 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -272,6 +272,19 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertEquals(expected, recordCount); } + static Map getPartitionRecordCount(String basePath, SQLContext sqlContext) { + sqlContext.clearCache(); + List rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList(); + Map partitionRecordCount = new HashMap<>(); + rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1))); + return partitionRecordCount; + } + + static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) { + sqlContext.clearCache(); + assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count()); + } + static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) { sqlContext.clearCache(); long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count(); @@ -1378,6 +1391,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, + "not_there"); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot, boolean addCommonProps, + String partitionPath) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); @@ -1388,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server", "false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath); if (useSchemaProvider) { parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile); if (hasTransformer) { @@ -1855,6 +1875,31 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE); } + @Test + public void testDeletePartitions() throws Exception { + prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path"); + String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp", null), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); + testNum++; + + prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); + prepareParquetDFSSource(false, false); + // set write operation to DELETE_PARTITION and add transformer to filter only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION + deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), + Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp", null), jsc); + deltaStreamer.sync(); + // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION. + TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + } + void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception { // Initial insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); @@ -2001,6 +2046,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } } + public static class TestSpecificPartitionTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + Dataset toReturn = rowDataset.filter("partition_path == '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'"); + return toReturn; + } + } + /** * Add new field evoluted_optional_union_field with value of the field rider. */