From 97ab97b72635164db5ac2a4f93e72e224603ffe0 Mon Sep 17 00:00:00 2001 From: liujinhui <965147871@qq.com> Date: Mon, 8 Jun 2020 20:46:47 +0800 Subject: [PATCH] [HUDI-918] Fix kafkaOffsetGen can not read kafka data bug (#1652) --- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 39c47a2c3..933127487 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -207,6 +208,11 @@ public class KafkaOffsetGen { maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka; long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; + + if (numEvents < toOffsets.size()) { + throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); + } + return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); }