diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java similarity index 65% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java index 040060886..c33c0f08c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java @@ -16,15 +16,16 @@ * limitations under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; -import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; @@ -34,6 +35,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -44,21 +46,21 @@ import java.util.Map; * Simplified Logic: * For every existing record * Write the record as is - * For all incoming records, write to file as is. + * For all incoming records, write to file as is, without de-duplicating based on the record key. * * Illustration with simple data. * Incoming data: - * rec1_2, rec4_2, rec5_1, rec6_1 + * rec1_2, rec1_3, rec4_2, rec5_1, rec6_1 * Existing data: * rec1_1, rec2_1, rec3_1, rec4_1 * * For every existing record, write to storage as is. * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage * Write all records from incoming set to storage - * => rec1_2, rec4_2, rec5_1 and rec6_1 + * => rec1_2, rec1_3, rec4_2, rec5_1 and rec6_1 * * Final snapshot in storage - * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1 + * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec1_3, rec4_2, rec5_1, rec6_1 * * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not * happen and every batch should have new records to be inserted. Above example is for illustration purposes only. @@ -66,16 +68,22 @@ import java.util.Map; public class HoodieConcatHandle extends HoodieMergeHandle { private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class); + // a representation of incoming records that tolerates duplicate keys + private final Iterator> recordItr; - public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr, - String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { - super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + this.recordItr = recordItr; } - public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId, - HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { - super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, Collections.emptyMap(), partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty()); + this.recordItr = keyToNewRecords.values().iterator(); } /** @@ -94,4 +102,17 @@ public class HoodieConcatHandle extends } recordsWritten++; } + + @Override + protected void writeIncomingRecords() throws IOException { + while (recordItr.hasNext()) { + HoodieRecord record = recordItr.next(); + if (needsUpdateLocation()) { + record.unseal(); + record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + record.seal(); + } + writeInsertRecord(record); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index b01d62f1a..4ca1dd770 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -257,6 +257,18 @@ public class HoodieMergeHandle extends H return writeRecord(hoodieRecord, indexedRecord); } + protected void writeInsertRecord(HoodieRecord hoodieRecord) throws IOException { + Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema; + Option insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps()); + // just skip the ignored record + if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { + return; + } + if (writeRecord(hoodieRecord, insertRecord)) { + insertRecordsWritten++; + } + } + protected boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) { Option recordMetadata = hoodieRecord.getData().getMetadata(); if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { @@ -340,28 +352,28 @@ public class HoodieMergeHandle extends H } } + protected void writeIncomingRecords() throws IOException { + // write out any pending records (this can happen when inserts are turned into updates) + Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) + ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator(); + while (newRecordsItr.hasNext()) { + HoodieRecord hoodieRecord = newRecordsItr.next(); + if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { + writeInsertRecord(hoodieRecord); + } + } + } + @Override public List close() { try { - // write out any pending records (this can happen when inserts are turned into updates) - Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) - ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator(); - while (newRecordsItr.hasNext()) { - HoodieRecord hoodieRecord = newRecordsItr.next(); - if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { - Schema schema = useWriterSchema ? tableSchemaWithMetaFields : tableSchema; - Option insertRecord = - hoodieRecord.getData().getInsertValue(schema, config.getProps()); - // just skip the ignore record - if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) { - continue; - } - writeRecord(hoodieRecord, insertRecord); - insertRecordsWritten++; - } - } + writeIncomingRecords(); - ((ExternalSpillableMap) keyToNewRecords).close(); + if (keyToNewRecords instanceof ExternalSpillableMap) { + ((ExternalSpillableMap) keyToNewRecords).close(); + } else { + keyToNewRecords.clear(); + } writtenRecordKeys.clear(); if (fileWriter != null) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 1935a3e5c..18c659373 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -45,7 +45,7 @@ import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; -import org.apache.hudi.io.storage.HoodieConcatHandle; +import org.apache.hudi.io.HoodieConcatHandle; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieSparkTable; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index f712201ee..bff9724b5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -710,6 +710,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { 2, false, config.populateMetaFields()); } + /** + * Test Insert API for HoodieConcatHandle when incoming entries contain duplicate keys. + */ + @Test + public void testInsertsWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), false); + } + + /** + * Test InsertPrepped API for HoodieConcatHandle when incoming entries contain duplicate keys. + */ + @Test + public void testInsertsPreppedWithHoodieConcatHandleOnDuplicateIncomingKeys() throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + testHoodieConcatHandleOnDupInserts(cfgBuilder.build(), true); + } + + private void testHoodieConcatHandleOnDupInserts(HoodieWriteConfig config, boolean isPrepped) throws Exception { + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withProps(config.getProps()) + .withMergeAllowDuplicateOnInserts(true) + .build(); + + SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); + + // Write 1 (only inserts) + String initCommitTime = "000"; + String newCommitTime = "001"; + int firstInsertRecords = 50; + insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, firstInsertRecords, SparkRDDWriteClient::insert, + isPrepped, true, firstInsertRecords, config.populateMetaFields()); + + // Write 2 (updates with duplicates) + String prevCommitTime = newCommitTime; + newCommitTime = "004"; + int secondInsertRecords = 100; // needs to be larger than firstInsertRecords to guarantee duplicate keys + List commitTimesBetweenPrevAndNew = Arrays.asList("002", "003"); + + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUpdates); + + writeBatch(client, newCommitTime, prevCommitTime, Option.of(commitTimesBetweenPrevAndNew), initCommitTime, + secondInsertRecords, recordGenFunction, SparkRDDWriteClient::insert, true, secondInsertRecords, + firstInsertRecords + secondInsertRecords, 2, false, config.populateMetaFields()); + } + /** * Tests deletion of records. */