diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 39c47a2c3..933127487 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -207,6 +208,11 @@ public class KafkaOffsetGen { maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka; long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; + + if (numEvents < toOffsets.size()) { + throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); + } + return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); }