diff --git a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java index 0ef81cf8f..fdf2cbf48 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java @@ -132,7 +132,9 @@ public class InMemoryHashIndex extends HoodieInde while (hoodieRecordIterator.hasNext()) { HoodieRecord rec = hoodieRecordIterator.next(); if (recordLocationMap.containsKey(rec.getKey())) { + rec.unseal(); rec.setCurrentLocation(recordLocationMap.get(rec.getKey())); + rec.seal(); } taggedRecords.add(rec); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 133f1e4a3..75016c6bd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -371,7 +371,9 @@ public class HoodieBloomIndex extends HoodieIndex // currentLocation 2 times and it will fail the second time. So creating a new in memory // copy of the hoodie record. record = new HoodieRecord<>(inputRecord); + record.unseal(); record.setCurrentLocation(location.get()); + record.seal(); } return record; } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 8399c4f31..111d23112 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -239,7 +239,9 @@ public class HBaseIndex extends HoodieIndex { currentRecord = new HoodieRecord( new HoodieKey(currentRecord.getRecordKey(), partitionPath), currentRecord.getData()); + currentRecord.unseal(); currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); + currentRecord.seal(); taggedRecords.add(currentRecord); // the key from Result and the key being processed should be same assert (currentRecord.getRecordKey().contentEquals(keyFromResult)); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 34178e618..0aa413739 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -293,7 +293,9 @@ public class HoodieAppendHandle extends HoodieWri private void writeToBuffer(HoodieRecord record) { // update the new location of the record, so we know where to find it next + record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + record.seal(); Option indexedRecord = getIndexedRecord(record); if (indexedRecord.isPresent()) { recordList.add(indexedRecord.get()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 4cb935ff7..afb8f851b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -101,7 +101,9 @@ public class HoodieCreateHandle extends HoodieWri IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); // update the new location of record, so we know where to find it next + record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); + record.seal(); recordsWritten++; insertRecordsWritten++; } else { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index e1926d06e..a819cf7ed 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -208,7 +208,9 @@ public class HoodieMergeHandle extends HoodieWrit HoodieRecord record = newRecordsItr.next(); partitionPath = record.getPartitionPath(); // update the new location of the record, so we know where to find it next + record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); + record.seal(); //NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist keyToNewRecords.put(record.getRecordKey(), record); } diff --git a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java index 5837628b6..8bd29b1d9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java +++ b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java @@ -113,7 +113,9 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness { TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + record1.unseal(); record1.setCurrentLocation(new HoodieRecordLocation("100", fileId)); + record1.seal(); updateRecords.add(record1); try { diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 6c0c0ca83..12ef63323 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -192,7 +192,9 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { TestRawTripPayload updateRowChanges1 = new TestRawTripPayload(updateRecordStr1); HoodieRecord updatedRecord1 = new HoodieRecord( new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1); + updatedRecord1.unseal(); updatedRecord1.setCurrentLocation(new HoodieRecordLocation(null, FSUtils.getFileId(parquetFile.getName()))); + updatedRecord1.seal(); TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); HoodieRecord insertedRecord1 = new HoodieRecord( @@ -407,7 +409,9 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { List insertRecords = dataGenerator.generateInserts("001", numInserts); List updateRecords = dataGenerator.generateUpdates("001", numUpdates); for (HoodieRecord updateRec : updateRecords) { + updateRec.unseal(); updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1")); + updateRec.seal(); } List records = new ArrayList<>(); records.addAll(insertRecords); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index c03f175d6..a5043de0f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -63,17 +63,24 @@ public class HoodieRecord implements Serializable */ private HoodieRecordLocation newLocation; + /** + * Indicates whether the object is sealed. + */ + private boolean sealed; + public HoodieRecord(HoodieKey key, T data) { this.key = key; this.data = data; this.currentLocation = null; this.newLocation = null; + this.sealed = false; } public HoodieRecord(HoodieRecord record) { this(record.key, record.data); this.currentLocation = record.currentLocation; this.newLocation = record.newLocation; + this.sealed = record.sealed; } public HoodieKey getKey() { @@ -100,6 +107,7 @@ public class HoodieRecord implements Serializable * Sets the current currentLocation of the record. This should happen exactly-once */ public HoodieRecord setCurrentLocation(HoodieRecordLocation location) { + checkState(); assert currentLocation == null; this.currentLocation = location; return this; @@ -114,6 +122,7 @@ public class HoodieRecord implements Serializable * exactly-once. */ public HoodieRecord setNewLocation(HoodieRecordLocation location) { + checkState(); assert newLocation == null; this.newLocation = location; return this; @@ -170,4 +179,18 @@ public class HoodieRecord implements Serializable assert key != null; return key.getRecordKey(); } + + public void seal() { + this.sealed = true; + } + + public void unseal() { + this.sealed = false; + } + + public void checkState() { + if (sealed) { + throw new UnsupportedOperationException("Not allowed to modify after sealed"); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java new file mode 100644 index 000000000..408fedc3e --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import static org.junit.Assert.fail; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SchemaTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for {@link HoodieRecord}. + */ +public class TestHoodieRecord { + + private HoodieRecord hoodieRecord; + + @Before + public void setUp() throws Exception { + final List indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1); + final List hoodieRecords = indexedRecords.stream() + .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"), + new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList()); + hoodieRecord = hoodieRecords.get(0); + } + + @Test + public void testModificationAfterSeal() { + hoodieRecord.seal(); + final HoodieRecordLocation location = new HoodieRecordLocation("100", "0"); + try { + hoodieRecord.setCurrentLocation(location); + fail("should fail since modification after sealed is not allowed"); + } catch (Exception e) { + Assert.assertTrue(e instanceof UnsupportedOperationException); + } + } + + @Test + public void testNormalModification() { + hoodieRecord.unseal(); + final HoodieRecordLocation location = new HoodieRecordLocation("100", "0"); + hoodieRecord.setCurrentLocation(location); + hoodieRecord.seal(); + + hoodieRecord.unseal(); + hoodieRecord.setNewLocation(location); + hoodieRecord.seal(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java index 62a36e8e4..c0b2d8f25 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java @@ -45,7 +45,9 @@ public class SpillableMapTestUtils { recordKeys.add(key); HoodieRecord record = new HoodieRecord<>(new HoodieKey(key, partitionPath), new HoodieAvroPayload(Option.of((GenericRecord) r))); + record.unseal(); record.setCurrentLocation(new HoodieRecordLocation("DUMMY_COMMIT_TIME", "DUMMY_FILE_ID")); + record.seal(); records.put(key, record); }); return recordKeys;