[HUDI-1447] DeltaStreamer kafka source supports consuming from specified timestamp (#2438)
This commit is contained in:
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||||
@@ -59,6 +60,7 @@ import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
|
|||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.schema.SchemaSet;
|
import org.apache.hudi.utilities.schema.SchemaSet;
|
||||||
import org.apache.hudi.utilities.sources.InputBatch;
|
import org.apache.hudi.utilities.sources.InputBatch;
|
||||||
|
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
|
||||||
import org.apache.hudi.utilities.transform.Transformer;
|
import org.apache.hudi.utilities.transform.Transformer;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
@@ -318,13 +320,12 @@ public class DeltaSync implements Serializable {
|
|||||||
if (lastCommit.isPresent()) {
|
if (lastCommit.isPresent()) {
|
||||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||||
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
|
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
|
||||||
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
|
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|
||||||
|
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
|
||||||
resumeCheckpointStr = Option.of(cfg.checkpoint);
|
resumeCheckpointStr = Option.of(cfg.checkpoint);
|
||||||
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
|
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
|
||||||
//if previous checkpoint is an empty string, skip resume use Option.empty()
|
//if previous checkpoint is an empty string, skip resume use Option.empty()
|
||||||
if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
|
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
|
||||||
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
|
|
||||||
}
|
|
||||||
} else if (commitMetadata.getOperationType() == WriteOperationType.CLUSTER) {
|
} else if (commitMetadata.getOperationType() == WriteOperationType.CLUSTER) {
|
||||||
// incase of CLUSTER commit, no checkpoint will be available in metadata.
|
// incase of CLUSTER commit, no checkpoint will be available in metadata.
|
||||||
resumeCheckpointStr = Option.empty();
|
resumeCheckpointStr = Option.empty();
|
||||||
@@ -336,6 +337,10 @@ public class DeltaSync implements Serializable {
|
|||||||
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
|
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
|
||||||
+ commitMetadata.toJsonString());
|
+ commitMetadata.toJsonString());
|
||||||
}
|
}
|
||||||
|
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
|
||||||
|
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
|
||||||
|
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
|
String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator);
|
||||||
|
|||||||
@@ -40,9 +40,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
|
|||||||
import org.apache.spark.streaming.kafka010.LocationStrategies;
|
import org.apache.spark.streaming.kafka010.LocationStrategies;
|
||||||
import org.apache.spark.streaming.kafka010.OffsetRange;
|
import org.apache.spark.streaming.kafka010.OffsetRange;
|
||||||
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads avro serialized Kafka data, based on the confluent schema-registry.
|
* Reads avro serialized Kafka data, based on the confluent schema-registry.
|
||||||
*/
|
*/
|
||||||
@@ -104,7 +101,7 @@ public class AvroKafkaSource extends AvroSource {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCommit(String lastCkptStr) {
|
public void onCommit(String lastCkptStr) {
|
||||||
if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
|
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
|
||||||
offsetGen.commitOffsetToKafka(lastCkptStr);
|
offsetGen.commitOffsetToKafka(lastCkptStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,9 +35,6 @@ import org.apache.spark.streaming.kafka010.KafkaUtils;
|
|||||||
import org.apache.spark.streaming.kafka010.LocationStrategies;
|
import org.apache.spark.streaming.kafka010.LocationStrategies;
|
||||||
import org.apache.spark.streaming.kafka010.OffsetRange;
|
import org.apache.spark.streaming.kafka010.OffsetRange;
|
||||||
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read json kafka data.
|
* Read json kafka data.
|
||||||
*/
|
*/
|
||||||
@@ -77,7 +74,7 @@ public class JsonKafkaSource extends JsonSource {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onCommit(String lastCkptStr) {
|
public void onCommit(String lastCkptStr) {
|
||||||
if (this.props.getBoolean(ENABLE_KAFKA_COMMIT_OFFSET, DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET)) {
|
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
|
||||||
offsetGen.commitOffsetToKafka(lastCkptStr);
|
offsetGen.commitOffsetToKafka(lastCkptStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.utilities.sources.helpers;
|
package org.apache.hudi.utilities.sources.helpers;
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceUtils;
|
import org.apache.hudi.DataSourceUtils;
|
||||||
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
||||||
@@ -30,6 +31,7 @@ import org.apache.hudi.utilities.sources.AvroKafkaSource;
|
|||||||
import org.apache.kafka.clients.consumer.CommitFailedException;
|
import org.apache.kafka.clients.consumer.CommitFailedException;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
|
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.common.PartitionInfo;
|
import org.apache.kafka.common.PartitionInfo;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
@@ -46,6 +48,7 @@ import java.util.HashSet;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -160,28 +163,48 @@ public class KafkaOffsetGen {
|
|||||||
*/
|
*/
|
||||||
public static class Config {
|
public static class Config {
|
||||||
|
|
||||||
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
|
private static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty
|
||||||
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
|
.key("hoodie.deltastreamer.source.kafka.topic")
|
||||||
public static final String ENABLE_KAFKA_COMMIT_OFFSET = "hoodie.deltastreamer.source.kafka.enable.commit.offset";
|
.noDefaultValue()
|
||||||
public static final Boolean DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET = false;
|
.withDocumentation("Kafka topic name.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.kafka.checkpoint.type")
|
||||||
|
.defaultValue("string")
|
||||||
|
.withDocumentation("Kafka chepoint type.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
|
||||||
|
.defaultValue(false)
|
||||||
|
.withDocumentation("Automatically submits offset to kafka.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Long> MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.kafka.source.maxEvents")
|
||||||
|
.defaultValue(5000000L)
|
||||||
|
.withDocumentation("Maximum number of records obtained in each batch.");
|
||||||
|
|
||||||
// "auto.offset.reset" is kafka native config param. Do not change the config param name.
|
// "auto.offset.reset" is kafka native config param. Do not change the config param name.
|
||||||
public static final String KAFKA_AUTO_OFFSET_RESET = "auto.offset.reset";
|
private static final ConfigProperty<KafkaResetOffsetStrategies> KAFKA_AUTO_OFFSET_RESET = ConfigProperty
|
||||||
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_OFFSET_RESET = KafkaResetOffsetStrategies.LATEST;
|
.key("auto.offset.reset")
|
||||||
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
|
.defaultValue(KafkaResetOffsetStrategies.LATEST)
|
||||||
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
.withDocumentation("Kafka consumer strategy for reading data.");
|
||||||
|
|
||||||
|
public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<String, Object> kafkaParams;
|
private final Map<String, Object> kafkaParams;
|
||||||
private final TypedProperties props;
|
private final TypedProperties props;
|
||||||
protected final String topicName;
|
protected final String topicName;
|
||||||
private KafkaResetOffsetStrategies autoResetValue;
|
private KafkaResetOffsetStrategies autoResetValue;
|
||||||
|
private final String kafkaCheckpointType;
|
||||||
|
|
||||||
public KafkaOffsetGen(TypedProperties props) {
|
public KafkaOffsetGen(TypedProperties props) {
|
||||||
this.props = props;
|
this.props = props;
|
||||||
kafkaParams = excludeHoodieConfigs(props);
|
kafkaParams = excludeHoodieConfigs(props);
|
||||||
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
|
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME.key()));
|
||||||
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
|
topicName = props.getString(Config.KAFKA_TOPIC_NAME.key());
|
||||||
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
|
kafkaCheckpointType = props.getString(Config.KAFKA_CHECKPOINT_TYPE.key(), Config.KAFKA_CHECKPOINT_TYPE.defaultValue());
|
||||||
|
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_OFFSET_RESET.key(), Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
|
for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
|
||||||
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
|
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
|
||||||
@@ -194,7 +217,7 @@ public class KafkaOffsetGen {
|
|||||||
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_OFFSET_RESET + " config set to unknown value " + kafkaAutoResetOffsetsStr);
|
||||||
}
|
}
|
||||||
if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
|
if (autoResetValue.equals(KafkaResetOffsetStrategies.GROUP)) {
|
||||||
this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET, Config.DEFAULT_KAFKA_AUTO_OFFSET_RESET.name().toLowerCase());
|
this.kafkaParams.put(Config.KAFKA_AUTO_OFFSET_RESET.key(), Config.KAFKA_AUTO_OFFSET_RESET.defaultValue().name().toLowerCase());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -212,6 +235,9 @@ public class KafkaOffsetGen {
|
|||||||
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
|
||||||
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
if (Config.KAFKA_CHECKPOINT_TYPE_TIMESTAMP.equals(kafkaCheckpointType) && isValidTimestampCheckpointType(lastCheckpointStr)) {
|
||||||
|
lastCheckpointStr = getOffsetsByTimestamp(consumer, partitionInfoList, topicPartitions, topicName, Long.parseLong(lastCheckpointStr.get()));
|
||||||
|
}
|
||||||
// Determine the offset ranges to read from
|
// Determine the offset ranges to read from
|
||||||
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) {
|
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) {
|
||||||
fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
|
||||||
@@ -237,8 +263,8 @@ public class KafkaOffsetGen {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
|
||||||
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
|
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.key(),
|
||||||
Config.maxEventsFromKafkaSource);
|
Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue());
|
||||||
|
|
||||||
long numEvents;
|
long numEvents;
|
||||||
if (sourceLimit == Long.MAX_VALUE) {
|
if (sourceLimit == Long.MAX_VALUE) {
|
||||||
@@ -271,6 +297,20 @@ public class KafkaOffsetGen {
|
|||||||
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the checkpoint is a timestamp.
|
||||||
|
* @param lastCheckpointStr
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private Boolean isValidTimestampCheckpointType(Option<String> lastCheckpointStr) {
|
||||||
|
if (!lastCheckpointStr.isPresent()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Pattern pattern = Pattern.compile("[-+]?[0-9]+(\\.[0-9]+)?");
|
||||||
|
Matcher isNum = pattern.matcher(lastCheckpointStr.get());
|
||||||
|
return isNum.matches() && (lastCheckpointStr.get().length() == 13 || lastCheckpointStr.get().length() == 10);
|
||||||
|
}
|
||||||
|
|
||||||
private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
|
private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
|
||||||
Long delayCount = 0L;
|
Long delayCount = 0L;
|
||||||
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
|
||||||
@@ -283,6 +323,41 @@ public class KafkaOffsetGen {
|
|||||||
return delayCount;
|
return delayCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the checkpoint by timestamp.
|
||||||
|
* This method returns the checkpoint format based on the timestamp.
|
||||||
|
* example:
|
||||||
|
* 1. input: timestamp, etc.
|
||||||
|
* 2. output: topicName,partition_num_0:100,partition_num_1:101,partition_num_2:102.
|
||||||
|
*
|
||||||
|
* @param consumer
|
||||||
|
* @param topicName
|
||||||
|
* @param timestamp
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private Option<String> getOffsetsByTimestamp(KafkaConsumer consumer, List<PartitionInfo> partitionInfoList, Set<TopicPartition> topicPartitions,
|
||||||
|
String topicName, Long timestamp) {
|
||||||
|
|
||||||
|
Map<TopicPartition, Long> topicPartitionsTimestamp = partitionInfoList.stream()
|
||||||
|
.map(x -> new TopicPartition(x.topic(), x.partition()))
|
||||||
|
.collect(Collectors.toMap(Function.identity(), x -> timestamp));
|
||||||
|
|
||||||
|
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
|
||||||
|
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionsTimestamp);
|
||||||
|
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append(topicName + ",");
|
||||||
|
for (Map.Entry<TopicPartition, OffsetAndTimestamp> map : offsetAndTimestamp.entrySet()) {
|
||||||
|
if (map.getValue() != null) {
|
||||||
|
sb.append(map.getKey().partition()).append(":").append(map.getValue().offset()).append(",");
|
||||||
|
} else {
|
||||||
|
sb.append(map.getKey().partition()).append(":").append(earliestOffsets.get(map.getKey())).append(",");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Option.of(sb.deleteCharAt(sb.length() - 1).toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if topic exists.
|
* Check if topic exists.
|
||||||
* @param consumer kafka consumer
|
* @param consumer kafka consumer
|
||||||
|
|||||||
@@ -58,7 +58,6 @@ import org.apache.hudi.utilities.sources.JdbcSource;
|
|||||||
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
import org.apache.hudi.utilities.sources.JsonKafkaSource;
|
||||||
import org.apache.hudi.utilities.sources.ParquetDFSSource;
|
import org.apache.hudi.utilities.sources.ParquetDFSSource;
|
||||||
import org.apache.hudi.utilities.sources.TestDataSource;
|
import org.apache.hudi.utilities.sources.TestDataSource;
|
||||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
|
||||||
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
|
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
|
||||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||||
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
|
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
|
||||||
@@ -139,6 +138,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
private static final int PARQUET_NUM_RECORDS = 5;
|
private static final int PARQUET_NUM_RECORDS = 5;
|
||||||
private static final int CSV_NUM_RECORDS = 3;
|
private static final int CSV_NUM_RECORDS = 3;
|
||||||
private static final int JSON_KAFKA_NUM_RECORDS = 5;
|
private static final int JSON_KAFKA_NUM_RECORDS = 5;
|
||||||
|
private String kafkaCheckpointType = "string";
|
||||||
// Required fields
|
// Required fields
|
||||||
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
|
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
|
||||||
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
|
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
|
||||||
@@ -274,7 +274,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
protected static void populateCommonKafkaProps(TypedProperties props) {
|
protected static void populateCommonKafkaProps(TypedProperties props) {
|
||||||
//Kafka source properties
|
//Kafka source properties
|
||||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, "earliest");
|
props.setProperty("auto.offset.reset", "earliest");
|
||||||
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
|
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
|
||||||
@@ -360,12 +360,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
|
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
|
||||||
String payloadClassName, String tableType) {
|
String payloadClassName, String tableType) {
|
||||||
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
|
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
|
||||||
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
|
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
|
static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName,
|
||||||
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
||||||
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
|
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField,
|
||||||
|
String checkpoint) {
|
||||||
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||||
cfg.targetBasePath = basePath;
|
cfg.targetBasePath = basePath;
|
||||||
cfg.targetTableName = "hoodie_trips";
|
cfg.targetTableName = "hoodie_trips";
|
||||||
@@ -377,6 +378,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
cfg.sourceOrderingField = sourceOrderingField;
|
cfg.sourceOrderingField = sourceOrderingField;
|
||||||
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
|
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
|
||||||
cfg.sourceLimit = sourceLimit;
|
cfg.sourceLimit = sourceLimit;
|
||||||
|
cfg.checkpoint = checkpoint;
|
||||||
if (updatePayloadClass) {
|
if (updatePayloadClass) {
|
||||||
cfg.payloadClassName = payloadClassName;
|
cfg.payloadClassName = payloadClassName;
|
||||||
}
|
}
|
||||||
@@ -1399,7 +1401,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||||
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
|
||||||
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
|
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
|
||||||
useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
|
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
testNum++;
|
testNum++;
|
||||||
@@ -1414,10 +1416,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
||||||
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
|
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
|
||||||
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
|
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
|
||||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
|
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
|
||||||
|
props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
|
||||||
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
|
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
|
||||||
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
|
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
|
||||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, autoResetValue);
|
props.setProperty("auto.offset.reset", autoResetValue);
|
||||||
|
|
||||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
||||||
}
|
}
|
||||||
@@ -1440,7 +1443,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||||
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
|
||||||
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
|
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
|
||||||
false, 100000, false, null, null, "timestamp"), jsc);
|
false, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
|
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
deltaStreamer.shutdownGracefully();
|
deltaStreamer.shutdownGracefully();
|
||||||
@@ -1453,7 +1456,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
deltaStreamer = new HoodieDeltaStreamer(
|
deltaStreamer = new HoodieDeltaStreamer(
|
||||||
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
||||||
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
||||||
true, 100000, false, null, null, "timestamp"), jsc);
|
true, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
|
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
|
||||||
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
|
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
|
||||||
@@ -1471,12 +1474,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
public void testJsonKafkaDFSSource() throws Exception {
|
public void testJsonKafkaDFSSource() throws Exception {
|
||||||
topicName = "topic" + testNum;
|
topicName = "topic" + testNum;
|
||||||
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
|
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
|
||||||
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName);
|
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
|
||||||
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
|
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
|
||||||
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||||
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
||||||
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
||||||
true, 100000, false, null, null, "timestamp"), jsc);
|
true, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
|
|
||||||
@@ -1488,6 +1491,31 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
|
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKafkaTimestampType() throws Exception {
|
||||||
|
topicName = "topic" + testNum;
|
||||||
|
kafkaCheckpointType = "timestamp";
|
||||||
|
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
|
||||||
|
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName);
|
||||||
|
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
|
||||||
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||||
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
||||||
|
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
||||||
|
true, 100000, false, null,
|
||||||
|
null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
|
||||||
|
deltaStreamer.sync();
|
||||||
|
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
|
|
||||||
|
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
|
||||||
|
deltaStreamer = new HoodieDeltaStreamer(
|
||||||
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
|
||||||
|
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
|
||||||
|
true, 100000, false, null, null,
|
||||||
|
"timestamp", String.valueOf(System.currentTimeMillis())), jsc);
|
||||||
|
deltaStreamer.sync();
|
||||||
|
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
|
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
|
||||||
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
|
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
|
||||||
@@ -1566,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(
|
||||||
tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(),
|
tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(),
|
||||||
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
|
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
|
||||||
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
|
useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc);
|
||||||
deltaStreamer.sync();
|
deltaStreamer.sync();
|
||||||
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||||
testNum++;
|
testNum++;
|
||||||
@@ -1679,7 +1707,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
String tableBasePath = dfsBasePath + "/triprec";
|
String tableBasePath = dfsBasePath + "/triprec";
|
||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(),
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(),
|
||||||
null, "test-jdbc-source.properties", false,
|
null, "test-jdbc-source.properties", false,
|
||||||
false, sourceLimit, false, null, null, "timestamp");
|
false, sourceLimit, false, null, null, "timestamp", null);
|
||||||
cfg.continuousMode = true;
|
cfg.continuousMode = true;
|
||||||
// Add 1000 records
|
// Add 1000 records
|
||||||
JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props);
|
JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props);
|
||||||
|
|||||||
@@ -94,11 +94,11 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||||
props.setProperty(Config.KAFKA_AUTO_OFFSET_RESET, resetStrategy);
|
props.setProperty("auto.offset.reset", resetStrategy);
|
||||||
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||||
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
|
||||||
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
|
||||||
String.valueOf(Config.maxEventsFromKafkaSource));
|
String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
|
||||||
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
|
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
@@ -193,7 +193,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
Config.maxEventsFromKafkaSource = 500;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
|
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
|
||||||
@@ -208,9 +207,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
InputBatch<Dataset<Row>> fetch2 =
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
|
||||||
assertEquals(1000, fetch2.getBatch().get().count());
|
assertEquals(1000, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
//reset the value back since it is a static variable
|
|
||||||
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -222,7 +218,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
|
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
Config.maxEventsFromKafkaSource = 500;
|
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
1. maxEventsFromKafkaSourceProp set to more than generated insert records
|
1. maxEventsFromKafkaSourceProp set to more than generated insert records
|
||||||
@@ -240,9 +236,6 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
InputBatch<Dataset<Row>> fetch2 =
|
InputBatch<Dataset<Row>> fetch2 =
|
||||||
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
|
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
|
||||||
assertEquals(300, fetch2.getBatch().get().count());
|
assertEquals(300, fetch2.getBatch().get().count());
|
||||||
|
|
||||||
//reset the value back since it is a static variable
|
|
||||||
Config.maxEventsFromKafkaSource = Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -300,7 +293,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
|
|||||||
|
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
TypedProperties props = createPropsForJsonSource(null, "earliest");
|
TypedProperties props = createPropsForJsonSource(null, "earliest");
|
||||||
props.put(ENABLE_KAFKA_COMMIT_OFFSET, "true");
|
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
|
||||||
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
|
||||||
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
|
||||||
|
|
||||||
|
|||||||
@@ -23,8 +23,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||||
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
|
|
||||||
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
|
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
@@ -62,9 +62,10 @@ public class TestKafkaOffsetGen {
|
|||||||
testUtils.teardown();
|
testUtils.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TypedProperties getConsumerConfigs(String autoOffsetReset) {
|
private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) {
|
||||||
TypedProperties props = new TypedProperties();
|
TypedProperties props = new TypedProperties();
|
||||||
props.put(Config.KAFKA_AUTO_OFFSET_RESET, autoOffsetReset);
|
props.put("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
|
||||||
|
props.put("auto.offset.reset", autoOffsetReset);
|
||||||
props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
|
||||||
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
|
||||||
props.setProperty("key.deserializer", StringDeserializer.class.getName());
|
props.setProperty("key.deserializer", StringDeserializer.class.getName());
|
||||||
@@ -79,7 +80,7 @@ public class TestKafkaOffsetGen {
|
|||||||
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest"));
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
|
||||||
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
|
||||||
assertEquals(1, nextOffsetRanges.length);
|
assertEquals(1, nextOffsetRanges.length);
|
||||||
assertEquals(0, nextOffsetRanges[0].fromOffset());
|
assertEquals(0, nextOffsetRanges[0].fromOffset());
|
||||||
@@ -96,7 +97,7 @@ public class TestKafkaOffsetGen {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest"));
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string"));
|
||||||
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
|
||||||
assertEquals(1, nextOffsetRanges.length);
|
assertEquals(1, nextOffsetRanges.length);
|
||||||
assertEquals(1000, nextOffsetRanges[0].fromOffset());
|
assertEquals(1000, nextOffsetRanges[0].fromOffset());
|
||||||
@@ -109,7 +110,7 @@ public class TestKafkaOffsetGen {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest"));
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "string"));
|
||||||
|
|
||||||
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics);
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500, metrics);
|
||||||
assertEquals(1, nextOffsetRanges.length);
|
assertEquals(1, nextOffsetRanges.length);
|
||||||
@@ -117,12 +118,26 @@ public class TestKafkaOffsetGen {
|
|||||||
assertEquals(750, nextOffsetRanges[0].untilOffset());
|
assertEquals(750, nextOffsetRanges[0].untilOffset());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNextOffsetRangesFromTimestampCheckpointType() {
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
|
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
||||||
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
|
|
||||||
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
|
||||||
|
|
||||||
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.of(String.valueOf(System.currentTimeMillis() - 100000)), 500, metrics);
|
||||||
|
assertEquals(1, nextOffsetRanges.length);
|
||||||
|
assertEquals(0, nextOffsetRanges[0].fromOffset());
|
||||||
|
assertEquals(500, nextOffsetRanges[0].untilOffset());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetNextOffsetRangesFromMultiplePartitions() {
|
public void testGetNextOffsetRangesFromMultiplePartitions() {
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest"));
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
|
||||||
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
|
OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
|
||||||
assertEquals(2, nextOffsetRanges.length);
|
assertEquals(2, nextOffsetRanges.length);
|
||||||
assertEquals(0, nextOffsetRanges[0].fromOffset());
|
assertEquals(0, nextOffsetRanges[0].fromOffset());
|
||||||
@@ -136,7 +151,7 @@ public class TestKafkaOffsetGen {
|
|||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
testUtils.createTopic(TEST_TOPIC_NAME, 2);
|
||||||
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
|
||||||
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
|
String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
|
||||||
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
|
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
|
||||||
// don't pass lastCheckpointString as we want to read from group committed offset
|
// don't pass lastCheckpointString as we want to read from group committed offset
|
||||||
@@ -147,7 +162,7 @@ public class TestKafkaOffsetGen {
|
|||||||
assertEquals(399, nextOffsetRanges[1].untilOffset());
|
assertEquals(399, nextOffsetRanges[1].untilOffset());
|
||||||
|
|
||||||
// committed offsets are not present for the consumer group
|
// committed offsets are not present for the consumer group
|
||||||
kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group"));
|
kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("group", "string"));
|
||||||
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
|
nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
|
||||||
assertEquals(500, nextOffsetRanges[0].fromOffset());
|
assertEquals(500, nextOffsetRanges[0].fromOffset());
|
||||||
assertEquals(500, nextOffsetRanges[0].untilOffset());
|
assertEquals(500, nextOffsetRanges[0].untilOffset());
|
||||||
@@ -157,7 +172,7 @@ public class TestKafkaOffsetGen {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckTopicExists() {
|
public void testCheckTopicExists() {
|
||||||
TypedProperties props = getConsumerConfigs("latest");
|
TypedProperties props = getConsumerConfigs("latest", "string");
|
||||||
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
|
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
|
||||||
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
testUtils.createTopic(TEST_TOPIC_NAME, 1);
|
||||||
boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
|
boolean topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
|
||||||
|
|||||||
Reference in New Issue
Block a user