Returning empty Statues for an empty spark partition caused due to incorrect bin packing
This commit is contained in:
committed by
vinoth chandar
parent
0015c9b00e
commit
e83dde3b95
@@ -176,6 +176,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
|
||||
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId,
|
||||
Iterator<HoodieRecord<T>> recordItr) throws IOException {
|
||||
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
|
||||
if (!recordItr.hasNext()) {
|
||||
logger.info("Empty partition with fileId => " + fileId);
|
||||
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
|
||||
}
|
||||
// these are updates
|
||||
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr);
|
||||
return handleUpdateInternal(upsertHandle, commitTime, fileId);
|
||||
@@ -235,6 +240,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
|
||||
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
|
||||
Iterator<HoodieRecord<T>> recordItr) throws Exception {
|
||||
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
|
||||
if (!recordItr.hasNext()) {
|
||||
logger.info("Empty partition");
|
||||
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
|
||||
}
|
||||
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user