diff --git a/docker/demo/config/base.properties b/docker/demo/config/base.properties index c53fe17e7..6e591c5f5 100644 --- a/docker/demo/config/base.properties +++ b/docker/demo/config/base.properties @@ -17,6 +17,7 @@ hoodie.upsert.shuffle.parallelism=2 hoodie.insert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 hoodie.bulkinsert.shuffle.parallelism=2 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE diff --git a/docker/demo/config/hoodie-incr.properties b/docker/demo/config/hoodie-incr.properties index 95a6627c8..80f474b1e 100644 --- a/docker/demo/config/hoodie-incr.properties +++ b/docker/demo/config/hoodie-incr.properties @@ -17,6 +17,7 @@ hoodie.upsert.shuffle.parallelism=2 hoodie.insert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 hoodie.bulkinsert.shuffle.parallelism=2 hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.partitionpath.field=partition diff --git a/docker/demo/config/test-suite/base.properties b/docker/demo/config/test-suite/base.properties index 13b1acc8e..0e75b3c25 100644 --- a/docker/demo/config/test-suite/base.properties +++ b/docker/demo/config/test-suite/base.properties @@ -17,5 +17,6 @@ # hoodie.upsert.shuffle.parallelism=2 hoodie.insert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 hoodie.bulkinsert.shuffle.parallelism=2 hoodie.datasource.write.partitionpath.field=timestamp diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7ad0f96b4..089474d15 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -209,7 +209,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public int getDeleteShuffleParallelism() { - return Integer.parseInt(props.getProperty(DELETE_PARALLELISM)); + return Math.max(Integer.parseInt(props.getProperty(DELETE_PARALLELISM)), 1); } public int getRollbackParallelism() { @@ -824,6 +824,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withDeleteParallelism(int parallelism) { + props.setProperty(DELETE_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) { props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism)); props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism)); @@ -851,6 +856,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder combineDeleteInput(boolean onDelete) { + props.setProperty(COMBINE_BEFORE_DELETE_PROP, String.valueOf(onDelete)); + return this; + } + public Builder withWriteStatusStorageLevel(String level) { props.setProperty(WRITE_STATUS_STORAGE_LEVEL, level); return this; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java index 8c0b75f30..b16bf63c2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/DeleteHelper.java @@ -44,27 +44,36 @@ public class DeleteHelper> { * Deduplicate Hoodie records, using the given deduplication function. * * @param keys RDD of HoodieKey to deduplicate + * @param table target Hoodie table for deduplicating + * @param parallelism parallelism or partitions to be used while reducing/deduplicating * @return RDD of HoodieKey already be deduplicated */ private static > JavaRDD deduplicateKeys(JavaRDD keys, - HoodieTable table) { + HoodieTable table, int parallelism) { boolean isIndexingGlobal = table.getIndex().isGlobal(); if (isIndexingGlobal) { return keys.keyBy(HoodieKey::getRecordKey) - .reduceByKey((key1, key2) -> key1) + .reduceByKey((key1, key2) -> key1, parallelism) .values(); } else { - return keys.distinct(); + return keys.distinct(parallelism); } } public static > HoodieWriteMetadata execute(String instantTime, - JavaRDD keys, JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, + JavaRDD keys, JavaSparkContext jsc, + HoodieWriteConfig config, HoodieTable table, CommitActionExecutor deleteExecutor) { try { HoodieWriteMetadata result = null; - // De-dupe/merge if needed - JavaRDD dedupedKeys = config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, table) : keys; + JavaRDD dedupedKeys = keys; + final int parallelism = config.getDeleteShuffleParallelism(); + if (config.shouldCombineBeforeDelete()) { + // De-dupe/merge if needed + dedupedKeys = deduplicateKeys(keys, table, parallelism); + } else if (!keys.partitions().isEmpty()) { + dedupedKeys = keys.repartition(parallelism); + } JavaRDD> dedupedRecords = dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); @@ -74,7 +83,7 @@ public class DeleteHelper> { ((HoodieTable)table).getIndex().tagLocation(dedupedRecords, jsc, (HoodieTable)table); Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); - // filter out non existant keys/records + // filter out non existent keys/records JavaRDD> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown); if (!taggedValidRecords.isEmpty()) { result = deleteExecutor.execute(taggedValidRecords); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 257f732ae..39ee532ba 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -437,7 +437,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { */ private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) - .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index b68cba643..db8ca529a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -462,7 +462,7 @@ public class TestHBaseIndex extends FunctionalTestHarness { private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) { return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(1, 1) + .withParallelism(1, 1).withDeleteParallelism(1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) .withAutoCommit(false).withStorageConfig(HoodieStorageConfig.newBuilder() diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index 22337f598..ba5d6dd29 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -174,7 +174,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { */ private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) - .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 76baa71b0..17ca6c969 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -309,6 +309,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { HoodieWriteConfig.Builder getConfigBuilder() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) + .withDeleteParallelism(2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) .forTable("test-trip-table") diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 9fc715a04..f7ce95c4a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -207,7 +207,7 @@ public class TestCleaner extends HoodieClientTestBase { HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { @@ -368,7 +368,7 @@ public class TestCleaner extends HoodieClientTestBase { HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); HoodieWriteClient client = getHoodieWriteClient(cfg); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index fb16af037..f60305175 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -803,6 +803,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2) .withAutoCommit(false).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) @@ -1466,6 +1467,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2) .withAutoCommit(autoCommit).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java new file mode 100644 index 000000000..8fda8ae09 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.bloom.HoodieBloomIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.Partition; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import scala.Tuple2; + +import java.util.Collections; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestDeleteHelper { + + private enum CombineTestMode { + None, GlobalIndex, NoneGlobalIndex; + } + + private static final String BASE_PATH = "/tmp/"; + private static final boolean WITH_COMBINE = true; + private static final boolean WITHOUT_COMBINE = false; + private static final int DELETE_PARALLELISM = 200; + + @Mock private HoodieBloomIndex index; + @Mock private HoodieTable table; + @Mock private CommitActionExecutor executor; + @Mock private HoodieWriteMetadata metadata; + @Mock private JavaPairRDD keyPairs; + @Mock private JavaSparkContext jsc; + + private JavaRDD rddToDelete; + private HoodieWriteConfig config; + + @BeforeEach + public void setUp() { + when(table.getIndex()).thenReturn(index); + } + + @Test + public void deleteWithEmptyRDDShouldNotExecute() { + rddToDelete = mockEmptyHoodieKeyRdd(); + config = newWriteConfig(WITHOUT_COMBINE); + + DeleteHelper.execute("test-time", rddToDelete, jsc, config, table, executor); + + verify(rddToDelete, never()).repartition(DELETE_PARALLELISM); + verifyNoDeleteExecution(); + } + + @Test + public void deleteWithoutCombineShouldRepartitionForNonEmptyRdd() { + rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.None); + config = newWriteConfig(WITHOUT_COMBINE); + + DeleteHelper.execute("test-time", rddToDelete, jsc, config, table, executor); + + verify(rddToDelete, times(1)).repartition(DELETE_PARALLELISM); + verifyDeleteExecution(); + } + + @Test + public void deleteWithCombineShouldRepartitionForNonEmptyRddAndNonGlobalIndex() { + rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.NoneGlobalIndex); + config = newWriteConfig(WITH_COMBINE); + + DeleteHelper.execute("test-time", rddToDelete, jsc, config, table, executor); + + verify(rddToDelete, times(1)).distinct(DELETE_PARALLELISM); + verifyDeleteExecution(); + } + + @Test + public void deleteWithCombineShouldRepartitionForNonEmptyRddAndGlobalIndex() { + rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.GlobalIndex); + config = newWriteConfig(WITH_COMBINE); + when(index.isGlobal()).thenReturn(true); + + DeleteHelper.execute("test-time", rddToDelete, jsc, config, table, executor); + + verify(keyPairs, times(1)).reduceByKey(any(), eq(DELETE_PARALLELISM)); + verifyDeleteExecution(); + } + + private void verifyDeleteExecution() { + verify(executor, times(1)).execute(any()); + verify(metadata, times(1)).setIndexLookupDuration(any()); + } + + private void verifyNoDeleteExecution() { + verify(executor, never()).execute(any()); + } + + private HoodieWriteConfig newWriteConfig(boolean combine) { + return HoodieWriteConfig.newBuilder() + .combineDeleteInput(combine) + .withPath(BASE_PATH) + .withDeleteParallelism(DELETE_PARALLELISM) + .build(); + } + + private JavaRDD newHoodieKeysRddMock(int howMany, CombineTestMode combineMode) { + JavaRDD keysToDelete = mock(JavaRDD.class); + + JavaRDD recordsRdd = mock(JavaRDD.class); + when(recordsRdd.filter(any())).thenReturn(recordsRdd); + when(recordsRdd.isEmpty()).thenReturn(howMany <= 0); + when(index.tagLocation(any(), any(), any())).thenReturn(recordsRdd); + + if (combineMode == CombineTestMode.GlobalIndex) { + when(keyPairs.reduceByKey(any(), anyInt())).thenReturn(keyPairs); + when(keyPairs.values()).thenReturn(keysToDelete); + when(keysToDelete.keyBy(any())).thenReturn(keyPairs); + } else if (combineMode == CombineTestMode.NoneGlobalIndex) { + when(keysToDelete.distinct(anyInt())).thenReturn(keysToDelete); + } else if (combineMode == CombineTestMode.None) { + List parts = mock(List.class); + when(parts.isEmpty()).thenReturn(howMany <= 0); + when(keysToDelete.repartition(anyInt())).thenReturn(keysToDelete); + when(keysToDelete.partitions()).thenReturn(parts); + } + + when(keysToDelete.map(any())).thenReturn(recordsRdd); + when(executor.execute(any())).thenReturn(metadata); + return keysToDelete; + } + + private JavaRDD mockEmptyHoodieKeyRdd() { + JavaRDD emptyRdd = mock(JavaRDD.class); + doReturn(true).when(emptyRdd).isEmpty(); + doReturn(Collections.emptyList()).when(emptyRdd).partitions(); + doReturn(emptyRdd).when(emptyRdd).map(any()); + + JavaPairRDD emptyPairRdd = mock(JavaPairRDD.class); + doReturn(Collections.emptyMap()).when(emptyPairRdd).countByKey(); + doReturn(emptyPairRdd).when(emptyRdd).mapToPair(any()); + + doReturn(emptyRdd).when(index).tagLocation(any(), any(), any()); + doReturn(emptyRdd).when(emptyRdd).filter(any()); + + doNothing().when(executor).saveWorkloadProfileMetadataToInflight(any(), anyString()); + doReturn(emptyRdd).when(jsc).emptyRDD(); + return emptyRdd; + } + +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index bd933abd7..c176dcddb 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -125,7 +125,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { */ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) - .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java index 8d7c094ac..04a1712a2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java @@ -165,6 +165,7 @@ public class SparkDatasetTestUtils { public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) + .withDeleteParallelism(2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) .forTable("test-trip-table") diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index 8e976c307..6f96fe966 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -88,7 +88,8 @@ public class HoodieWriteClientExample { // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) + .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); HoodieWriteClient client = new HoodieWriteClient<>(jsc, cfg); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java index 941ef43ca..15e6f7072 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java @@ -113,6 +113,7 @@ public class TestDFSHoodieDatasetInputReader extends UtilitiesTestBase { // Prepare the AvroParquetIO return HoodieWriteConfig.newBuilder().withPath(dfsBasePath) .withParallelism(2, 2) + .withDeleteParallelism(2) .withSchema(HoodieTestDataGenerator .TRIP_EXAMPLE_SCHEMA); } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 9c42232d1..2ee6cae62 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -199,6 +199,7 @@ public class HoodieJavaApp { Dataset inputDF3 = spark.read().json(jssc.parallelize(deletes, 2)); writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") + .option("hoodie.delete.shuffle.parallelism", "2") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType) // Hoodie Table Type .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), "delete") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 3b35ce919..606490f44 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -354,6 +354,7 @@ public class HoodieJavaStreamingApp { DataStreamWriter writer = streamingInput.writeStream().format("org.apache.hudi") .option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2") + .option("hoodie.delete.shuffle.parallelism", "2") .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 14e16abba..976e8309f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -247,6 +247,7 @@ public class UtilHelpers { HoodieWriteConfig.newBuilder().withPath(basePath) .withParallelism(parallelism, parallelism) .withBulkInsertParallelism(parallelism) + .withDeleteParallelism(parallelism) .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withProps(properties).build(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java index 618af51de..39341b2ee 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java @@ -111,6 +111,7 @@ public class TestHoodieSnapshotExporter extends FunctionalTestHarness { .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withBulkInsertParallelism(2) + .withDeleteParallelism(2) .forTable(TABLE_NAME) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) .build(); diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/base.properties b/hudi-utilities/src/test/resources/delta-streamer-config/base.properties index 0a5bbb6b3..cb7f1d1d8 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/base.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/base.properties @@ -17,4 +17,5 @@ ### hoodie.upsert.shuffle.parallelism=2 hoodie.insert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 hoodie.bulkinsert.shuffle.parallelism=2