HoodieSortedMergeHandle#close write data disorder (#4841)
Co-authored-by: 854194341@qq.com <loukey_7821>
This commit is contained in:
@@ -108,8 +108,9 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
@Override
|
@Override
|
||||||
public List<WriteStatus> close() {
|
public List<WriteStatus> close() {
|
||||||
// write out any pending records (this can happen when inserts are turned into updates)
|
// write out any pending records (this can happen when inserts are turned into updates)
|
||||||
newRecordKeysSorted.stream().forEach(key -> {
|
while (!newRecordKeysSorted.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
|
String key = newRecordKeysSorted.poll();
|
||||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||||
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
||||||
if (useWriterSchema) {
|
if (useWriterSchema) {
|
||||||
@@ -122,7 +123,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
|
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
newRecordKeysSorted.clear();
|
newRecordKeysSorted.clear();
|
||||||
keyToNewRecords.clear();
|
keyToNewRecords.clear();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user