From e10e06918e4758917513c55f9bc02c35dad99128 Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Fri, 11 Oct 2019 20:28:45 +0800 Subject: [PATCH] [HUDI-292] Avoid consuming more entries from kafka than specified sourceLimit. (#947) - Special handling when allocedEvents > numEvents - Added unit tests --- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 5 +++++ .../apache/hudi/utilities/sources/TestKafkaSource.java | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 558e75710..9dd232d36 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -125,6 +125,11 @@ public class KafkaOffsetGen { exhaustedPartitions.add(range.partition()); } allocedEvents += toOffset - range.untilOffset(); + // We need recompute toOffset if allocedEvents larger than numEvents. + if (allocedEvents > numEvents) { + long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents)); + toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd); + } ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index 9825ae6f7..9ac4bf438 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -183,5 +183,15 @@ public class TestKafkaSource extends UtilitiesTestBase { assertEquals(10, ranges[0].count()); assertEquals(100000, ranges[1].count()); assertEquals(10000, ranges[2].count()); + + // not all partitions consume same entries. + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}), + makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001); + assertEquals(1001, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(100, ranges[0].count()); + assertEquals(226, ranges[1].count()); + assertEquals(226, ranges[2].count()); + assertEquals(226, ranges[3].count()); + assertEquals(223, ranges[4].count()); } }