From 32a44bbe062c997b5a41266290fbe34d6323bfa6 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 20 Dec 2021 21:01:59 -0800 Subject: [PATCH] [HUDI-2970] Add test for archiving replace commit (#4345) --- ...arkCopyOnWriteTableArchiveWithReplace.java | 103 ++++++++++++++++++ ...HoodieSparkMergeOnReadTableClustering.java | 12 +- ...eSparkMergeOnReadTableIncrementalRead.java | 6 +- ...arkMergeOnReadTableInsertUpdateDelete.java | 4 +- .../SparkClientFunctionalTestHarness.java | 4 +- .../testutils/HoodieTestDataGenerator.java | 3 +- 6 files changed, 118 insertions(+), 14 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java new file mode 100644 index 000000000..1c6602348 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; +import static org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("functional") +public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClientFunctionalTestHarness { + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOException { + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + HoodieWriteConfig writeConfig = getConfigBuilder(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build()) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) { + + // 1st write batch; 3 commits for 3 partitions + String instantTime1 = HoodieActiveTimeline.createNewInstantTime(1000); + client.startCommitWithTime(instantTime1); + client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1, 10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1); + String instantTime2 = HoodieActiveTimeline.createNewInstantTime(2000); + client.startCommitWithTime(instantTime2); + client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2, 10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2); + String instantTime3 = HoodieActiveTimeline.createNewInstantTime(3000); + client.startCommitWithTime(instantTime3); + client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3); + + final HoodieTimeline timeline1 = metaClient.getCommitsTimeline().filterCompletedInstants(); + assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline1, Option.empty())); + + // delete the 1st and the 2nd partition; 1 replace commit + final String instantTime4 = HoodieActiveTimeline.createNewInstantTime(4000); + client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); + client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); + + // 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit + for (int i = 5; i < 8; i++) { + String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); + client.startCommitWithTime(instantTime); + client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime); + } + + // verify archived timeline + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline(); + assertTrue(archivedTimeline.containsInstant(instantTime1)); + assertTrue(archivedTimeline.containsInstant(instantTime2)); + assertTrue(archivedTimeline.containsInstant(instantTime3)); + assertTrue(archivedTimeline.containsInstant(instantTime4), "should contain the replace commit."); + + // verify records + final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants(); + assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), + "should only have the 4 records from the 3rd partition."); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index f0ece849c..a0ec0de37 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -113,7 +113,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 400); - Stream dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + Stream dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime); assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); /* @@ -122,7 +122,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes // we already set small file size to small number to force inserts to go into new file. newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime); assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); if (doUpdates) { @@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes newCommitTime = "003"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); } HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); @@ -190,18 +190,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 400); - Stream dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + Stream dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime); assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files"); newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime); assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files"); // run updates if (doUpdates) { newCommitTime = "003"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); } HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index db55d3687..c80374b64 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -96,7 +96,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF client.startCommitWithTime(commitTime1); List records001 = dataGen.generateInserts(commitTime1, 200); - Stream dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1); + Stream dataFiles = insertRecordsToMORTable(metaClient, records001, client, cfg, commitTime1); assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); // verify only one base file shows up with commit time 001 @@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF String updateTime = "004"; client.startCommitWithTime(updateTime); List records004 = dataGen.generateUpdates(updateTime, 100); - updateRecords(metaClient, records004, client, cfg, updateTime); + updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime); // verify RO incremental reads - only one base file shows up because updates to into log files incrementalROFiles = getROIncrementalFiles(partitionPath, false); @@ -145,7 +145,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF String insertsTime = "006"; List records006 = dataGen.generateInserts(insertsTime, 200); client.startCommitWithTime(insertsTime); - dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime); + dataFiles = insertRecordsToMORTable(metaClient, records006, client, cfg, insertsTime); assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); // verify new write shows up in snapshot mode even though there is pending compaction diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 254d75779..62ce00749 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -95,7 +95,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 200); - Stream dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime); + Stream dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime); assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); /* @@ -104,7 +104,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie newCommitTime = "004"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecords(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); client.compact(compactionCommitTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 88c4c13d9..1ecdd3364 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -213,7 +213,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table)); } - protected Stream insertRecords(HoodieTableMetaClient metaClient, List records, + protected Stream insertRecordsToMORTable(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); @@ -242,7 +242,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe return dataFilesToRead; } - protected void updateRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); Map recordsMap = new HashMap<>(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 5bf629ea9..21816a56c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -76,7 +76,7 @@ import java.util.stream.Stream; *

* Test data uses a toy Uber trips, data model. */ -public class HoodieTestDataGenerator { +public class HoodieTestDataGenerator implements AutoCloseable { // based on examination of sample file, the schema produces the following per record size public static final int BYTES_PER_RECORD = (int) (1.2 * 1024); @@ -860,6 +860,7 @@ public class HoodieTestDataGenerator { public String partitionPath; } + @Override public void close() { existingKeysBySchema.clear(); }