[HUDI-2970] Add test for archiving replace commit (#4345)
This commit is contained in:
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.functional;
|
||||
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
|
||||
import static org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Tag("functional")
|
||||
public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClientFunctionalTestHarness {
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOException {
|
||||
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
|
||||
HoodieWriteConfig writeConfig = getConfigBuilder(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
|
||||
.build();
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
|
||||
|
||||
// 1st write batch; 3 commits for 3 partitions
|
||||
String instantTime1 = HoodieActiveTimeline.createNewInstantTime(1000);
|
||||
client.startCommitWithTime(instantTime1);
|
||||
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1, 10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1);
|
||||
String instantTime2 = HoodieActiveTimeline.createNewInstantTime(2000);
|
||||
client.startCommitWithTime(instantTime2);
|
||||
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2, 10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2);
|
||||
String instantTime3 = HoodieActiveTimeline.createNewInstantTime(3000);
|
||||
client.startCommitWithTime(instantTime3);
|
||||
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3);
|
||||
|
||||
final HoodieTimeline timeline1 = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||
assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline1, Option.empty()));
|
||||
|
||||
// delete the 1st and the 2nd partition; 1 replace commit
|
||||
final String instantTime4 = HoodieActiveTimeline.createNewInstantTime(4000);
|
||||
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
|
||||
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
|
||||
|
||||
// 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
|
||||
for (int i = 5; i < 8; i++) {
|
||||
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
|
||||
client.startCommitWithTime(instantTime);
|
||||
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
|
||||
}
|
||||
|
||||
// verify archived timeline
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
||||
assertTrue(archivedTimeline.containsInstant(instantTime1));
|
||||
assertTrue(archivedTimeline.containsInstant(instantTime2));
|
||||
assertTrue(archivedTimeline.containsInstant(instantTime3));
|
||||
assertTrue(archivedTimeline.containsInstant(instantTime4), "should contain the replace commit.");
|
||||
|
||||
// verify records
|
||||
final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants();
|
||||
assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
|
||||
"should only have the 4 records from the 3rd partition.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,7 +113,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
/*
|
||||
@@ -122,7 +122,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
// we already set small file size to small number to force inserts to go into new file.
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
|
||||
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
if (doUpdates) {
|
||||
@@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
newCommitTime = "003";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecords(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
}
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
|
||||
@@ -190,18 +190,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
|
||||
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
|
||||
dataFiles = insertRecordsToMORTable(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
|
||||
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
|
||||
// run updates
|
||||
if (doUpdates) {
|
||||
newCommitTime = "003";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecords(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
}
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
|
||||
|
||||
@@ -96,7 +96,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
|
||||
client.startCommitWithTime(commitTime1);
|
||||
|
||||
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records001, client, cfg, commitTime1);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
// verify only one base file shows up with commit time 001
|
||||
@@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
|
||||
String updateTime = "004";
|
||||
client.startCommitWithTime(updateTime);
|
||||
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
|
||||
updateRecords(metaClient, records004, client, cfg, updateTime);
|
||||
updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime);
|
||||
|
||||
// verify RO incremental reads - only one base file shows up because updates to into log files
|
||||
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
|
||||
@@ -145,7 +145,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
|
||||
String insertsTime = "006";
|
||||
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
|
||||
client.startCommitWithTime(insertsTime);
|
||||
dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime);
|
||||
dataFiles = insertRecordsToMORTable(metaClient, records006, client, cfg, insertsTime);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
// verify new write shows up in snapshot mode even though there is pending compaction
|
||||
|
||||
@@ -95,7 +95,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
/*
|
||||
@@ -104,7 +104,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
|
||||
newCommitTime = "004";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecords(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
|
||||
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
|
||||
client.compact(compactionCommitTime);
|
||||
|
||||
@@ -213,7 +213,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
||||
index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
|
||||
}
|
||||
|
||||
protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
|
||||
protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
|
||||
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
@@ -242,7 +242,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
||||
return dataFilesToRead;
|
||||
}
|
||||
|
||||
protected void updateRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
|
||||
|
||||
@@ -76,7 +76,7 @@ import java.util.stream.Stream;
|
||||
* <p>
|
||||
* Test data uses a toy Uber trips, data model.
|
||||
*/
|
||||
public class HoodieTestDataGenerator {
|
||||
public class HoodieTestDataGenerator implements AutoCloseable {
|
||||
|
||||
// based on examination of sample file, the schema produces the following per record size
|
||||
public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
|
||||
@@ -860,6 +860,7 @@ public class HoodieTestDataGenerator {
|
||||
public String partitionPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
existingKeysBySchema.clear();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user