diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java index 253dc775d..a6007ffbe 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java @@ -52,7 +52,8 @@ public class MergeOnReadLazyInsertIterable extend List statuses = new ArrayList<>(); // lazily initialize the handle, for the first time if (handle == null) { - handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix)); + handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, + insertPayload.getPartitionPath(), getNextFileId(idPrefix)); } if (handle.canWrite(insertPayload)) { // write the payload, if the handle has capacity @@ -62,7 +63,8 @@ public class MergeOnReadLazyInsertIterable extend handle.close(); statuses.add(handle.getWriteStatus()); // Need to handle the rejected payload & open new handle - handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix)); + handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, + insertPayload.getPartitionPath(), getNextFileId(idPrefix)); handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index ea55f2e61..39601b4cb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -74,7 +74,6 @@ public class HoodieAppendHandle extends HoodieWri // Buffer for holding records (to be deleted) in memory before they are flushed to disk private List keysToDelete = new ArrayList<>(); - private String partitionPath; private Iterator> recordItr; // Total number of records written during an append private long recordsWritten = 0; @@ -101,21 +100,21 @@ public class HoodieAppendHandle extends HoodieWri // Total number of new records inserted into the delta file private long insertRecordsWritten = 0; - public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId, - Iterator> recordItr) { - super(config, commitTime, fileId, hoodieTable); + public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Iterator> recordItr) { + super(config, commitTime, partitionPath, fileId, hoodieTable); writeStatus.setStat(new HoodieDeltaWriteStat()); this.fileId = fileId; this.recordItr = recordItr; } - public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId) { - this(config, commitTime, hoodieTable, fileId, null); + public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, + String partitionPath, String fileId) { + this(config, commitTime, hoodieTable, partitionPath, fileId, null); } private void init(HoodieRecord record) { if (doInit) { - this.partitionPath = record.getPartitionPath(); // extract some information from the first record SliceView rtView = hoodieTable.getSliceView(); Option fileSlice = rtView.getLatestFileSlice(partitionPath, fileId); @@ -295,6 +294,13 @@ public class HoodieAppendHandle extends HoodieWri } private void writeToBuffer(HoodieRecord record) { + if (!partitionPath.equals(record.getPartitionPath())) { + HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + + record.getPartitionPath() + " but trying to insert into partition: " + partitionPath); + writeStatus.markFailure(record, failureEx, record.getData().getMetadata()); + return; + } + // update the new location of the record, so we know where to find it next record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index c76134ef3..87c83cf65 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -57,7 +57,7 @@ public class HoodieCreateHandle extends HoodieWri public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileId) { - super(config, commitTime, fileId, hoodieTable); + super(config, commitTime, partitionPath, fileId, hoodieTable); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 199c0a092..f26163262 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -70,9 +70,9 @@ public class HoodieMergeHandle extends HoodieWrit private boolean useWriterSchema; public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, - Iterator> recordItr, String fileId) { - super(config, commitTime, fileId, hoodieTable); - String partitionPath = init(fileId, recordItr); + Iterator> recordItr, String partitionPath, String fileId) { + super(config, commitTime, partitionPath, fileId, hoodieTable); + init(fileId, recordItr); init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); } @@ -80,12 +80,12 @@ public class HoodieMergeHandle extends HoodieWrit * Called by compactor code path. */ public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, - Map> keyToNewRecords, String fileId, HoodieBaseFile dataFileToBeMerged) { - super(config, commitTime, fileId, hoodieTable); + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged) { + super(config, commitTime, partitionPath, fileId, hoodieTable); this.keyToNewRecords = keyToNewRecords; this.useWriterSchema = true; - init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath(), - dataFileToBeMerged); + init(fileId, this.partitionPath, dataFileToBeMerged); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -143,7 +143,7 @@ public class HoodieMergeHandle extends HoodieWrit /** * Load the new incoming records in a map and return partitionPath. */ - private String init(String fileId, Iterator> newRecordsItr) { + private void init(String fileId, Iterator> newRecordsItr) { try { // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); @@ -153,10 +153,8 @@ public class HoodieMergeHandle extends HoodieWrit } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } - String partitionPath = null; while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); - partitionPath = record.getPartitionPath(); // update the new location of the record, so we know where to find it next record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); @@ -170,7 +168,6 @@ public class HoodieMergeHandle extends HoodieWrit + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); - return partitionPath; } private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Option indexedRecord) { @@ -182,6 +179,12 @@ public class HoodieMergeHandle extends HoodieWrit private boolean writeRecord(HoodieRecord hoodieRecord, Option indexedRecord) { Option recordMetadata = hoodieRecord.getData().getMetadata(); + if (!partitionPath.equals(hoodieRecord.getPartitionPath())) { + HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath); + writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata); + return false; + } try { if (indexedRecord.isPresent()) { // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 115f7837c..336e508e5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -52,11 +52,14 @@ public abstract class HoodieWriteHandle extends H protected final Schema writerSchema; protected HoodieTimer timer; protected final WriteStatus writeStatus; + protected final String partitionPath; protected final String fileId; protected final String writeToken; - public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String fileId, HoodieTable hoodieTable) { + public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, + String fileId, HoodieTable hoodieTable) { super(config, instantTime, hoodieTable); + this.partitionPath = partitionPath; this.fileId = fileId; this.writeToken = makeSparkWriteToken(); this.originalSchema = new Schema.Parser().parse(config.getSchema()); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 82b08b706..2791bc765 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -170,7 +170,8 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); } - public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) + public Iterator> handleUpdate(String commitTime, String partitionPath, String fileId, + Iterator> recordItr) throws IOException { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { @@ -178,14 +179,14 @@ public class HoodieCopyOnWriteTable extends Hoodi return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr); + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, partitionPath, fileId, recordItr); return handleUpdateInternal(upsertHandle, commitTime, fileId); } - public Iterator> handleUpdate(String commitTime, String fileId, + public Iterator> handleUpdate(String commitTime, String partitionPath, String fileId, Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException { // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, oldDataFile); + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, partitionPath, fileId, keyToNewRecords, oldDataFile); return handleUpdateInternal(upsertHandle, commitTime, fileId); } @@ -220,13 +221,14 @@ public class HoodieCopyOnWriteTable extends Hoodi return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } - protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, Iterator> recordItr) { - return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileId); + protected HoodieMergeHandle getUpdateHandle(String commitTime, String partitionPath, String fileId, Iterator> recordItr) { + return new HoodieMergeHandle<>(config, commitTime, this, recordItr, partitionPath, fileId); } - protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, + protected HoodieMergeHandle getUpdateHandle(String commitTime, String partitionPath, String fileId, Map> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { - return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged); + return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, + partitionPath, fileId, dataFileToBeMerged); } public Iterator> handleInsert(String commitTime, String idPfx, Iterator> recordItr) @@ -258,7 +260,7 @@ public class HoodieCopyOnWriteTable extends Hoodi if (btype.equals(BucketType.INSERT)) { return handleInsert(commitTime, binfo.fileIdPrefix, recordItr); } else if (btype.equals(BucketType.UPDATE)) { - return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr); + return handleUpdate(commitTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr); } else { throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); } @@ -523,12 +525,14 @@ public class HoodieCopyOnWriteTable extends Hoodi BucketType bucketType; String fileIdPrefix; + String partitionPath; @Override public String toString() { final StringBuilder sb = new StringBuilder("BucketInfo {"); sb.append("bucketType=").append(bucketType).append(", "); - sb.append("fileIdPrefix=").append(fileIdPrefix); + sb.append("fileIdPrefix=").append(fileIdPrefix).append(", "); + sb.append("partitionPath=").append(partitionPath); sb.append('}'); return sb.toString(); } @@ -585,18 +589,22 @@ public class HoodieCopyOnWriteTable extends Hoodi private void assignUpdates(WorkloadProfile profile) { // each update location gets a partition - WorkloadStat gStat = profile.getGlobalStat(); - for (Map.Entry> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) { - addUpdateBucket(updateLocEntry.getKey()); + Set> partitionStatEntries = profile.getPartitionPathStatMap().entrySet(); + for (Map.Entry partitionStat : partitionStatEntries) { + for (Map.Entry> updateLocEntry : + partitionStat.getValue().getUpdateLocationToCount().entrySet()) { + addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + } } } - private int addUpdateBucket(String fileIdHint) { + private int addUpdateBucket(String partitionPath, String fileIdHint) { int bucket = totalBuckets; updateLocationToBucket.put(fileIdHint, bucket); BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.UPDATE; bucketInfo.fileIdPrefix = fileIdHint; + bucketInfo.partitionPath = partitionPath; bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; return bucket; @@ -631,7 +639,7 @@ public class HoodieCopyOnWriteTable extends Hoodi bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { - bucket = addUpdateBucket(smallFile.location.getFileId()); + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); @@ -655,6 +663,7 @@ public class HoodieCopyOnWriteTable extends Hoodi recordsPerBucket.add(totalUnassignedInserts / insertBuckets); BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; + bucketInfo.partitionPath = partitionPath; bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 50d41b349..df7a0c137 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -98,15 +98,17 @@ public class HoodieMergeOnReadTable extends Hoodi } @Override - public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) + public Iterator> handleUpdate(String commitTime, String partitionPath, + String fileId, Iterator> recordItr) throws IOException { LOG.info("Merging updates for commit " + commitTime + " for file " + fileId); if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId); - return super.handleUpdate(commitTime, fileId, recordItr); + return super.handleUpdate(commitTime, partitionPath, fileId, recordItr); } else { - HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); + HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, commitTime, this, + partitionPath, fileId, recordItr); appendHandle.doAppend(); appendHandle.close(); return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java index 029276ad3..41ca21271 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/WorkloadProfile.java @@ -95,6 +95,10 @@ public class WorkloadProfile implements Serializa return partitionPathStatMap.keySet(); } + public HashMap getPartitionPathStatMap() { + return partitionPathStatMap; + } + public WorkloadStat getWorkloadStat(String partitionPath) { return partitionPathStatMap.get(partitionPath); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java index e1d2706a2..22c90453c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java @@ -136,7 +136,8 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a // new base parquet file. if (oldDataFileOpt.isPresent()) { - result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), + result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getPartitionPath(), + operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()); } else { result = hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index eb1fc745b..8c949fb0a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -118,7 +118,8 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { updateRecords.add(record1); try { - HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId); + HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, + updateRecords.iterator(), record1.getPartitionPath(), fileId); Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema()); List oldRecords = ParquetUtils.readAvroRecords(conf, diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 95248a464..e62f5e574 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -18,8 +18,10 @@ package org.apache.hudi.table; -import org.apache.hudi.common.HoodieClientTestHarness; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload; @@ -41,9 +43,6 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.table.HoodieCopyOnWriteTable.UpsertPartitioner; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; @@ -53,6 +52,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import scala.Tuple2; import java.io.File; import java.util.ArrayList; @@ -61,8 +61,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import scala.Tuple2; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -209,8 +207,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc); List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { - return newTable.handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(), - updatedRecords.iterator()); + return newTable.handleUpdate(newCommitTime, updatedRecord1.getPartitionPath(), + updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); // Check the updated file @@ -470,7 +468,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { @Test public void testInsertUpsertWithHoodieAvroPayload() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); metaClient = HoodieTableMetaClient.reload(metaClient); final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); String commitTime = "000"; @@ -484,13 +482,15 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { String fileId = writeStatus.getFileId(); metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close(); final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc); - final List updates = - dataGen.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()); + dataGen.generateUpdatesWithHoodieAvroPayload(commitTime, inserts); - jsc.parallelize(Arrays.asList(1)).map(x -> { - return table2.handleUpdate("001", fileId, updates.iterator()); + String partitionPath = updates.get(0).getPartitionPath(); + long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count(); + final List> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { + return table.handleUpdate(commitTime, partitionPath, fileId, updates.iterator()); }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); + assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); } @After diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 740caf2df..4d6c5bcc9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -18,17 +18,20 @@ package org.apache.hudi.table; -import org.apache.hudi.common.HoodieClientTestHarness; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieMergeOnReadTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -52,10 +55,6 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.After; import org.junit.Assert; @@ -1208,6 +1207,85 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { } } + /** + * Test to validate invoking table.handleUpdate() with input records from multiple partitions will fail. + */ + @Test + public void testHandleUpdateWithMultiplePartitions() throws Exception { + HoodieWriteConfig cfg = getConfig(true); + try (HoodieWriteClient client = getWriteClient(cfg);) { + + /** + * Write 1 (only inserts, written as parquet file) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.getHoodieTable(metaClient, cfg, jsc); + + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + BaseFileOnlyView roView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = roView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.findAny().isPresent()); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = roView.getLatestBaseFiles(); + assertTrue("should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (only updates, written to .log file) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + writeRecords = jsc.parallelize(records, 1); + statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /** + * Write 3 (only deletes, written to .log file) + */ + final String newDeleteTime = "004"; + final String partitionPath = records.get(0).getPartitionPath(); + final String fileId = statuses.get(0).getFileId(); + client.startCommitWithTime(newDeleteTime); + + List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); + JavaRDD deleteRDD = jsc.parallelize(fewRecordsForDelete, 1); + + // initialize partitioner + hoodieTable.getUpsertPartitioner(new WorkloadProfile(deleteRDD)); + final List> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { + return hoodieTable.handleUpdate(newDeleteTime, partitionPath, fileId, fewRecordsForDelete.iterator()); + }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); + + // Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for + // specific partition) + WriteStatus status = deleteStatus.get(0).get(0); + assertTrue(status.hasErrors()); + long numRecordsInPartition = fewRecordsForDelete.stream().filter(u -> + u.getPartitionPath().equals(partitionPath)).count(); + assertEquals(fewRecordsForDelete.size() - numRecordsInPartition, status.getTotalErrorRecords()); + } + } + private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); }