1
0

[HUDI-3052] Fix flaky testJsonKafkaSourceResetStrategy (#4381)

This commit is contained in:
Raymond Xu
2021-12-18 17:58:51 -08:00
committed by GitHub
parent f57e28fe39
commit bb99836841

View File

@@ -23,11 +23,11 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,17 +39,19 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -58,41 +60,37 @@ import static org.mockito.Mockito.mock;
/** /**
* Tests against {@link JsonKafkaSource}. * Tests against {@link JsonKafkaSource}.
*/ */
public class TestJsonKafkaSource extends UtilitiesTestBase { public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness {
private static String TEST_TOPIC_NAME = "hoodie_test"; private static final String TEST_TOPIC_PREFIX = "hoodie_test_";
private static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
private static KafkaTestUtils testUtils;
private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
private FilebasedSchemaProvider schemaProvider; private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
@BeforeAll @BeforeAll
public static void initClass() throws Exception { public static void initClass() throws Exception {
UtilitiesTestBase.initClass(false);
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}
@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
testUtils = new KafkaTestUtils(); testUtils = new KafkaTestUtils();
testUtils.setup(); testUtils.setup();
} }
@AfterEach @AfterAll
public void teardown() throws Exception { public static void cleanupClass() {
super.teardown();
testUtils.teardown(); testUtils.teardown();
} }
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) { @BeforeEach
public void init() throws Exception {
String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
TypedProperties props = new TypedProperties(); TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath);
schemaProvider = new FilebasedSchemaProvider(props, jsc());
}
private TypedProperties createPropsForJsonSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", resetStrategy); props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -107,16 +105,17 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
public void testJsonKafkaSource() { public void testJsonKafkaSource() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSource";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest"); TypedProperties props = createPropsForJsonSource(topic, null, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count()); assertEquals(900, fetch1.getBatch().get().count());
// Test Avro To DataFrame<Row> path // Test Avro To DataFrame<Row> path
@@ -125,7 +124,7 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
assertEquals(900, fetch1AsRows.count()); assertEquals(900, fetch1AsRows.count());
// 2. Produce new data, extract new data // 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count()); assertEquals(1100, fetch2.getBatch().get().count());
@@ -155,19 +154,20 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testJsonKafkaSourceFilterNullMsg() { public void testJsonKafkaSourceFilterNullMsg() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceFilterNullMsg";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest"); TypedProperties props = createPropsForJsonSource(topic, null, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
// Send 1000 non-null messages to Kafka // Send 1000 non-null messages to Kafka
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
// Send 100 null messages to Kafka // Send 100 null messages to Kafka
testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]); testUtils.sendMessages(topic, new String[100]);
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
// Verify that messages with null values are filtered // Verify that messages with null values are filtered
assertEquals(1000, fetch1.getBatch().get().count()); assertEquals(1000, fetch1.getBatch().get().count());
@@ -177,15 +177,16 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testJsonKafkaSourceResetStrategy() { public void testJsonKafkaSourceResetStrategy() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceResetStrategy";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties earliestProps = createPropsForJsonSource(null, "earliest"); TypedProperties earliestProps = createPropsForJsonSource(topic, null, "earliest");
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics); Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource); SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);
TypedProperties latestProps = createPropsForJsonSource(null, "latest"); TypedProperties latestProps = createPropsForJsonSource(topic, null, "latest");
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics); Source latestJsonSource = new JsonKafkaSource(latestProps, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource); SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);
// 1. Extract with a none data kafka checkpoint // 1. Extract with a none data kafka checkpoint
@@ -195,7 +196,7 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
assertEquals(earFetch0.getBatch(), latFetch0.getBatch()); assertEquals(earFetch0.getBatch(), latFetch0.getBatch());
assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch()); assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
// 2. Extract new checkpoint with a null / empty string pre checkpoint // 2. Extract new checkpoint with a null / empty string pre checkpoint
// => earliest fetch with max source limit will get all of data and a end offset checkpoint // => earliest fetch with max source limit will get all of data and a end offset checkpoint
@@ -209,23 +210,24 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testJsonKafkaSourceWithDefaultUpperCap() { public void testJsonKafkaSourceWithDefaultUpperCap() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithDefaultUpperCap";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
/* /*
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
*/ */
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1000, fetch1.getBatch().get().count()); assertEquals(1000, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on sourceLimit // 2. Produce new data, extract new data based on sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500); kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
assertEquals(1000, fetch2.getBatch().get().count()); assertEquals(1000, fetch2.getBatch().get().count());
@@ -234,11 +236,12 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceInsertRecordsLessSourceLimit";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
@@ -246,7 +249,7 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
1. maxEventsFromKafkaSourceProp set to more than generated insert records 1. maxEventsFromKafkaSourceProp set to more than generated insert records
and sourceLimit less than the generated insert records num. and sourceLimit less than the generated insert records num.
*/ */
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 400)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300);
assertEquals(300, fetch1.getBatch().get().count()); assertEquals(300, fetch1.getBatch().get().count());
@@ -254,7 +257,7 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
2. Produce new data, extract new data based on sourceLimit 2. Produce new data, extract new data based on sourceLimit
and sourceLimit less than the generated insert records num. and sourceLimit less than the generated insert records num.
*/ */
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 600)));
InputBatch<Dataset<Row>> fetch2 = InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300); kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
assertEquals(300, fetch2.getBatch().get().count()); assertEquals(300, fetch2.getBatch().get().count());
@@ -263,20 +266,21 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testJsonKafkaSourceWithConfigurableUpperCap() { public void testJsonKafkaSourceWithConfigurableUpperCap() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithConfigurableUpperCap";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L, "earliest"); TypedProperties props = createPropsForJsonSource(topic, 500L, "earliest");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count()); assertEquals(900, fetch1.getBatch().get().count());
// 2. Produce new data, extract new data based on upper cap // 2. Produce new data, extract new data based on upper cap
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(500, fetch2.getBatch().get().count()); assertEquals(500, fetch2.getBatch().get().count());
@@ -306,22 +310,23 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
@Test @Test
public void testCommitOffsetToKafka() { public void testCommitOffsetToKafka() {
// topic setup. // topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2); final String topic = TEST_TOPIC_PREFIX + "testCommitOffsetToKafka";
testUtils.createTopic(topic, 2);
List<TopicPartition> topicPartitions = new ArrayList<>(); List<TopicPartition> topicPartitions = new ArrayList<>();
TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0); TopicPartition topicPartition0 = new TopicPartition(topic, 0);
topicPartitions.add(topicPartition0); topicPartitions.add(topicPartition0);
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1); TopicPartition topicPartition1 = new TopicPartition(topic, 1);
topicPartitions.add(topicPartition1); topicPartitions.add(topicPartition1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest"); TypedProperties props = createPropsForJsonSource(topic, null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true"); props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit // 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599); InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599);
// commit to kafka after first batch // commit to kafka after first batch
@@ -340,7 +345,7 @@ public class TestJsonKafkaSource extends UtilitiesTestBase {
assertEquals(500L, endOffsets.get(topicPartition0)); assertEquals(500L, endOffsets.get(topicPartition0));
assertEquals(500L, endOffsets.get(topicPartition1)); assertEquals(500L, endOffsets.get(topicPartition1));
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500))); testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 500)));
InputBatch<Dataset<Row>> fetch2 = InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);