[HUDI-918] Fix kafkaOffsetGen can not read kafka data bug (#1652)
This commit is contained in:
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
|
|||||||
import org.apache.hudi.DataSourceUtils;
|
import org.apache.hudi.DataSourceUtils;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
@@ -207,6 +208,11 @@ public class KafkaOffsetGen {
|
|||||||
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
|
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
|
||||||
? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
|
? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
|
||||||
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
|
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
|
||||||
|
|
||||||
|
if (numEvents < toOffsets.size()) {
|
||||||
|
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
|
||||||
|
}
|
||||||
|
|
||||||
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user