[HUDI-1944] Support Hudi to read from committed offset (#3175)
* [HUDI-1944] Support Hudi to read from committed offset * [HUDI-1944] Adding group option to KafkaResetOffsetStrategies * [HUDI-1944] Update Exception msg
This commit is contained in:
@@ -151,7 +151,7 @@ public class KafkaOffsetGen {
|
|||||||
* Kafka reset offset strategies.
|
* Kafka reset offset strategies.
|
||||||
*/
|
*/
|
||||||
enum KafkaResetOffsetStrategies {
|
enum KafkaResetOffsetStrategies {
|
||||||
LATEST, EARLIEST
|
LATEST, EARLIEST, GROUP
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -192,6 +192,9 @@ public class KafkaOffsetGen {
|
|||||||
if (!found) {
|
if (!found) {
|
||||||
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
||||||
}
|
}
|
||||||
|
if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
|
||||||
|
this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
|
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
|
||||||
@@ -220,8 +223,11 @@ public class KafkaOffsetGen {
|
|||||||
case LATEST:
|
case LATEST:
|
||||||
fromOffsets = consumer.endOffsets(topicPartitions);
|
fromOffsets = consumer.endOffsets(topicPartitions);
|
||||||
break;
|
break;
|
||||||
|
case GROUP:
|
||||||
|
fromOffsets = getGroupOffsets(consumer, topicPartitions);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
|
throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' or 'group' ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -318,7 +324,6 @@ public class KafkaOffsetGen {
|
|||||||
public void commitOffsetToKafka(String checkpointStr) {
|
public void commitOffsetToKafka(String checkpointStr) {
|
||||||
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
|
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
|
||||||
Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
|
Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr);
|
||||||
Map<String, Object> kafkaParams = excludeHoodieConfigs(props);
|
|
||||||
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(offsetMap.size());
|
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(offsetMap.size());
|
||||||
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
|
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
|
||||||
offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
|
offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
|
||||||
@@ -327,4 +332,19 @@ public class KafkaOffsetGen {
|
|||||||
LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
|
LOG.warn("Committing offsets to Kafka failed, this does not impact processing of records", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, Set<TopicPartition> topicPartitions) {
|
||||||
|
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
|
||||||
|
for (TopicPartition topicPartition : topicPartitions) {
|
||||||
|
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(topicPartition);
|
||||||
|
if (committedOffsetAndMetadata != null) {
|
||||||
|
fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
|
||||||
|
} else {
|
||||||
|
LOG.warn("There are no commits associated with this consumer group, starting to consume from latest offset");
|
||||||
|
fromOffsets = consumer.endOffsets(topicPartitions);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fromOffsets;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
|
|||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
||||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
|
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
|
||||||
@@ -33,6 +34,8 @@ import org.junit.jupiter.api.AfterEach;
|
|||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
@@ -66,6 +69,7 @@ public class TestKafkaOffsetGen {
|
|||||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||||
props.setProperty("key.deserializer", StringDeserializer.class.getName());
|
props.setProperty("key.deserializer", StringDeserializer.class.getName());
|
||||||
props.setProperty("value.deserializer", StringDeserializer.class.getName());
|
props.setProperty("value.deserializer", StringDeserializer.class.getName());
|
||||||
|
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,6 +131,30 @@ public class TestKafkaOffsetGen {
|
|||||||
assertEquals(249, nextOffsetRanges[1].untilOffset());
|
assertEquals(249, nextOffsetRanges[1].untilOffset());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNextOffsetRangesFromGroup() {
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
|
||||||
|
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
|
||||||
|
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
|
||||||
|
// don't pass lastCheckpointString as we want to read from group committed offset
|
||||||
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
|
||||||
|
assertEquals(250, nextOffsetRanges[0].fromOffset());
|
||||||
|
assertEquals(400, nextOffsetRanges[0].untilOffset());
|
||||||
|
assertEquals(249, nextOffsetRanges[1].fromOffset());
|
||||||
|
assertEquals(399, nextOffsetRanges[1].untilOffset());
|
||||||
|
|
||||||
|
// committed offsets are not present for the consumer group
|
||||||
|
kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
|
||||||
|
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
|
||||||
|
assertEquals(500, nextOffsetRanges[0].fromOffset());
|
||||||
|
assertEquals(500, nextOffsetRanges[0].untilOffset());
|
||||||
|
assertEquals(500, nextOffsetRanges[1].fromOffset());
|
||||||
|
assertEquals(500, nextOffsetRanges[1].untilOffset());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckTopicExists() {
|
public void testCheckTopicExists() {
|
||||||
TypedProperties props = getConsumerConfigs("latest");
|
TypedProperties props = getConsumerConfigs("latest");
|
||||||
|
|||||||
Reference in New Issue
Block a user