[HUDI-340]: made max events to read from kafka source configurable (#1039)
This commit is contained in:
@@ -50,8 +50,6 @@ public class KafkaOffsetGen {
|
|||||||
|
|
||||||
private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);
|
private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);
|
||||||
|
|
||||||
private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max
|
|
||||||
|
|
||||||
public static class CheckpointUtils {
|
public static class CheckpointUtils {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -170,10 +168,13 @@ public class KafkaOffsetGen {
|
|||||||
/**
|
/**
|
||||||
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
|
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
|
||||||
*/
|
*/
|
||||||
static class Config {
|
public static class Config {
|
||||||
|
|
||||||
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
|
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
|
||||||
|
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
|
||||||
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
|
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
|
||||||
|
public static final long defaultMaxEventsFromKafkaSource = 5000000;
|
||||||
|
public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = defaultMaxEventsFromKafkaSource;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final HashMap<String, String> kafkaParams;
|
private final HashMap<String, String> kafkaParams;
|
||||||
@@ -229,7 +230,11 @@ public class KafkaOffsetGen {
|
|||||||
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
|
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
|
||||||
|
|
||||||
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
||||||
long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
|
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
||||||
|
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE);
|
||||||
|
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
|
||||||
|
? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : maxEventsToReadFromKafka;
|
||||||
|
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
|
||||||
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
|
||||||
|
|
||||||
return offsetRanges;
|
return offsetRanges;
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import org.apache.hudi.utilities.UtilitiesTestBase;
|
|||||||
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
||||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
|
||||||
|
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
@@ -78,18 +79,26 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
testUtils.teardown();
|
testUtils.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
|
||||||
public void testJsonKafkaSource() throws IOException {
|
|
||||||
|
|
||||||
// topic setup.
|
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||||
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
|
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
|
||||||
props.setProperty("auto.offset.reset", "smallest");
|
props.setProperty("auto.offset.reset", "smallest");
|
||||||
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
||||||
|
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
||||||
|
String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE));
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSource() throws IOException {
|
||||||
|
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties props = createPropsForJsonSource(null);
|
||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
@@ -131,6 +140,78 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
assertEquals(Option.empty(), fetch4AsRows.getBatch());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE);
|
||||||
|
|
||||||
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500;
|
||||||
|
|
||||||
|
/*
|
||||||
|
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
|
||||||
|
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
|
||||||
|
*/
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
|
||||||
|
assertEquals(500, fetch1.getBatch().get().count());
|
||||||
|
|
||||||
|
// 2. Produce new data, extract new data based on sourceLimit
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
|
||||||
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
|
||||||
|
assertEquals(1500, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
|
//reset the value back since it is a static variable
|
||||||
|
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = Config.defaultMaxEventsFromKafkaSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException {
|
||||||
|
// topic setup.
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
TypedProperties props = createPropsForJsonSource(500L);
|
||||||
|
|
||||||
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
|
||||||
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
|
||||||
|
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
|
||||||
|
assertEquals(900, fetch1.getBatch().get().count());
|
||||||
|
|
||||||
|
// 2. Produce new data, extract new data based on upper cap
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
|
||||||
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||||
|
assertEquals(500, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
|
//fetch data respecting source limit where upper cap > sourceLimit
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch3 =
|
||||||
|
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), 400);
|
||||||
|
assertEquals(400, fetch3.getBatch().get().count());
|
||||||
|
|
||||||
|
//fetch data respecting source limit where upper cap < sourceLimit
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch4 =
|
||||||
|
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), 600);
|
||||||
|
assertEquals(600, fetch4.getBatch().get().count());
|
||||||
|
|
||||||
|
// 3. Extract with previous checkpoint => gives same data back (idempotent)
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch5 =
|
||||||
|
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||||
|
assertEquals(fetch2.getBatch().get().count(), fetch5.getBatch().get().count());
|
||||||
|
assertEquals(fetch2.getCheckpointForNextBatch(), fetch5.getCheckpointForNextBatch());
|
||||||
|
|
||||||
|
// 4. Extract with latest checkpoint => no new data returned
|
||||||
|
InputBatch<JavaRDD<GenericRecord>> fetch6 =
|
||||||
|
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
|
||||||
|
assertEquals(Option.empty(), fetch6.getBatch());
|
||||||
|
}
|
||||||
|
|
||||||
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
|
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
|
||||||
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
|
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
|
||||||
for (int i = 0; i < partitions.length; i++) {
|
for (int i = 0; i < partitions.length; i++) {
|
||||||
|
|||||||
Reference in New Issue
Block a user