[HUDI-1062]Remove unnecessary maxEvent check and add some log in KafkaOffsetGen (#1779)
This commit is contained in:
@@ -202,9 +202,14 @@ public class KafkaOffsetGen {
|
|||||||
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
||||||
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
||||||
Config.maxEventsFromKafkaSource);
|
Config.maxEventsFromKafkaSource);
|
||||||
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
|
|
||||||
? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
|
long numEvents;
|
||||||
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
|
if (sourceLimit == Long.MAX_VALUE) {
|
||||||
|
numEvents = maxEventsToReadFromKafka;
|
||||||
|
LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka);
|
||||||
|
} else {
|
||||||
|
numEvents = sourceLimit;
|
||||||
|
}
|
||||||
|
|
||||||
if (numEvents < toOffsets.size()) {
|
if (numEvents < toOffsets.size()) {
|
||||||
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
|
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
|
||||||
|
|||||||
@@ -191,13 +191,45 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
*/
|
*/
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
assertEquals(500, fetch1.getBatch().get().count());
|
assertEquals(1000, fetch1.getBatch().get().count());
|
||||||
|
|
||||||
// 2. Produce new data, extract new data based on sourceLimit
|
// 2. Produce new data, extract new data based on sourceLimit
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
|
||||||
InputBatch<Dataset<Row>> fetch2 =
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
|
||||||
assertEquals(1500, fetch2.getBatch().get().count());
|
assertEquals(1000, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
|
//reset the value back since it is a static variable
|
||||||
|
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
|
||||||
|
|
||||||
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
Config.maxEventsFromKafkaSource = 500;
|
||||||
|
|
||||||
|
/*
|
||||||
|
1. maxEventsFromKafkaSourceProp set to more than generated insert records
|
||||||
|
and sourceLimit less than the generated insert records num.
|
||||||
|
*/
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400)));
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300);
|
||||||
|
assertEquals(300, fetch1.getBatch().get().count());
|
||||||
|
|
||||||
|
/*
|
||||||
|
2. Produce new data, extract new data based on sourceLimit
|
||||||
|
and sourceLimit less than the generated insert records num.
|
||||||
|
*/
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600)));
|
||||||
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
|
||||||
|
assertEquals(300, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
//reset the value back since it is a static variable
|
//reset the value back since it is a static variable
|
||||||
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
||||||
|
|||||||
Reference in New Issue
Block a user