From d58644b6573152ac191a1119c395d71cdc7c9c7d Mon Sep 17 00:00:00 2001 From: Trevor <33487819+Trevor-zhang@users.noreply.github.com> Date: Thu, 9 Jul 2020 12:07:34 +0800 Subject: [PATCH] [HUDI-1062]Remove unnecessary maxEvent check and add some log in KafkaOffsetGen (#1779) --- .../sources/helpers/KafkaOffsetGen.java | 11 ++++-- .../utilities/sources/TestKafkaSource.java | 36 +++++++++++++++++-- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index cf8e3d8f0..23d3c8d61 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -202,9 +202,14 @@ public class KafkaOffsetGen { // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, Config.maxEventsFromKafkaSource); - maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) - ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka; - long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; + + long numEvents; + if (sourceLimit == Long.MAX_VALUE) { + numEvents = maxEventsToReadFromKafka; + LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka); + } else { + numEvents = sourceLimit; + } if (numEvents < toOffsets.size()) { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index aeedca366..76a494c81 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -191,13 +191,45 @@ public class TestKafkaSource extends UtilitiesTestBase { */ testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(500, fetch1.getBatch().get().count()); + assertEquals(1000, fetch1.getBatch().get().count()); // 2. Produce new data, extract new data based on sourceLimit testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500); - assertEquals(1500, fetch2.getBatch().get().count()); + assertEquals(1000, fetch2.getBatch().get().count()); + + //reset the value back since it is a static variable + Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; + } + + @Test + public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); + + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + Config.maxEventsFromKafkaSource = 500; + + /* + 1. maxEventsFromKafkaSourceProp set to more than generated insert records + and sourceLimit less than the generated insert records num. + */ + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400))); + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300); + assertEquals(300, fetch1.getBatch().get().count()); + + /* + 2. Produce new data, extract new data based on sourceLimit + and sourceLimit less than the generated insert records num. + */ + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600))); + InputBatch> fetch2 = + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300); + assertEquals(300, fetch2.getBatch().get().count()); //reset the value back since it is a static variable Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;