1
0

[HUDI-2496] Insert duplicate records when precombined is deactivated for "insert" operation (#3740)

This commit is contained in:
Ilias Antoniou
2021-10-11 04:33:16 +03:00
committed by GitHub
parent ad63938890
commit ceace1c653
4 changed files with 111 additions and 31 deletions

View File

@@ -710,6 +710,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
2, false, config.populateMetaFields());
}
/**
* Test Insert API for HoodieConcatHandle when incoming entries contain duplicate keys.
*/
@Test
public void testInsertsWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), false);
}
/**
* Test InsertPrepped API for HoodieConcatHandle when incoming entries contain duplicate keys.
*/
@Test
public void testInsertsPreppedWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), true);
}
private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig config, boolean isPrepped) throws Exception {
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps())
.withMergeAllowDuplicateOnInserts(true)
.build();
SparkRDDWriteClient<RawTripTestPayload> client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1 (only inserts)
String initCommitTime = "000";
String newCommitTime = "001";
int firstInsertRecords = 50;
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, firstInsertRecords, SparkRDDWriteClient::insert,
isPrepped, true, firstInsertRecords, config.populateMetaFields());
// Write 2 (updates with duplicates)
String prevCommitTime = newCommitTime;
newCommitTime = "004";
int secondInsertRecords = 100; // needs to be larger than firstInsertRecords to guarantee duplicate keys
List<String> commitTimesBetweenPrevAndNew = Arrays.asList("002", "003");
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUpdates);
writeBatch(client, newCommitTime, prevCommitTime, Option.of(commitTimesBetweenPrevAndNew), initCommitTime,
secondInsertRecords, recordGenFunction, SparkRDDWriteClient::insert, true, secondInsertRecords,
firstInsertRecords + secondInsertRecords, 2, false, config.populateMetaFields());
}
/**
* Tests deletion of records.
*/