[HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947)
- Special handling when allocedEvents > numEvents - Added unit tests
This commit is contained in:
@@ -125,6 +125,11 @@ public class KafkaOffsetGen {
|
||||
exhaustedPartitions.add(range.partition());
|
||||
}
|
||||
allocedEvents += toOffset - range.untilOffset();
|
||||
// We need recompute toOffset if allocedEvents larger than numEvents.
|
||||
if (allocedEvents > numEvents) {
|
||||
long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
|
||||
toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
|
||||
}
|
||||
ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,5 +183,15 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
||||
assertEquals(10, ranges[0].count());
|
||||
assertEquals(100000, ranges[1].count());
|
||||
assertEquals(10000, ranges[2].count());
|
||||
|
||||
// not all partitions consume same entries.
|
||||
ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
|
||||
makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001);
|
||||
assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
|
||||
assertEquals(100, ranges[0].count());
|
||||
assertEquals(226, ranges[1].count());
|
||||
assertEquals(226, ranges[2].count());
|
||||
assertEquals(226, ranges[3].count());
|
||||
assertEquals(223, ranges[4].count());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user