1
0

[HUDI-238] Make Hudi support Scala 2.12 (#1226)

* [HUDI-238] Rename scala related artifactId & add maven profile to support Scala 2.12
This commit is contained in:
wenningd
2020-01-17 14:02:21 -08:00
committed by Balaji Varadarajan
parent 923e2b4a1e
commit 292c1e2ff4
19 changed files with 261 additions and 153 deletions

View File

@@ -24,16 +24,17 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -47,6 +48,8 @@ public class AvroKafkaSource extends AvroSource {
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
offsetGen = new KafkaOffsetGen(props);
}
@@ -64,9 +67,7 @@ public class AvroKafkaSource extends AvroSource {
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<GenericRecord> recordRDD =
KafkaUtils.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj);
return recordRDD;
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
}

View File

@@ -24,14 +24,15 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import kafka.serializer.StringDecoder;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
/**
* Read json kafka data.
@@ -45,6 +46,8 @@ public class JsonKafkaSource extends JsonSource {
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(properties, sparkContext, sparkSession, schemaProvider);
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
}
@@ -61,7 +64,7 @@ public class JsonKafkaSource extends JsonSource {
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class,
offsetGen.getKafkaParams(), offsetRanges).values();
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
}
}

View File

@@ -22,30 +22,24 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Predef;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.util.Either;
/**
* Source to read data from Kafka, incrementally.
*/
@@ -58,8 +52,8 @@ public class KafkaOffsetGen {
/**
* Reconstruct checkpoint from string.
*/
public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
public static HashMap<TopicPartition, Long> strToOffsets(String checkpointStr) {
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
if (checkpointStr.length() == 0) {
return offsetMap;
}
@@ -67,8 +61,7 @@ public class KafkaOffsetGen {
String topic = splits[0];
for (int i = 1; i < splits.length; i++) {
String[] subSplits = splits[i].split(":");
offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])),
new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1])));
offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1]));
}
return offsetMap;
}
@@ -83,7 +76,7 @@ public class KafkaOffsetGen {
// at least 1 partition will be present.
sb.append(ranges[0].topic() + ",");
sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
.collect(Collectors.joining(",")));
.collect(Collectors.joining(",")));
return sb.toString();
}
@@ -94,32 +87,32 @@ public class KafkaOffsetGen {
* @param toOffsetMap offsets of where each partitions is currently at
* @param numEvents maximum number of events to read.
*/
public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, LeaderOffset> toOffsetMap, long numEvents) {
public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap,
Map<TopicPartition, Long> toOffsetMap, long numEvents) {
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
// Create initial offset ranges for each 'to' partition, with from = to offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
toOffsetMap.entrySet().stream().map(e -> {
TopicAndPartition tp = e.getKey();
long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset();
TopicPartition tp = e.getKey();
long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
return OffsetRange.create(tp, fromOffset, fromOffset);
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
long allocedEvents = 0;
java.util.Set<Integer> exhaustedPartitions = new HashSet<>();
Set<Integer> exhaustedPartitions = new HashSet<>();
// keep going until we have events to allocate and partitions still not exhausted.
while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) {
long remainingEvents = numEvents - allocedEvents;
long eventsPerPartition =
(long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
(long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size()));
// Allocate the remaining events to non-exhausted partitions, in round robin fashion
for (int i = 0; i < ranges.length; i++) {
OffsetRange range = ranges[i];
if (!exhaustedPartitions.contains(range.partition())) {
long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset();
long toOffsetMax = toOffsetMap.get(range.topicPartition());
long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);
if (toOffset == toOffsetMax) {
exhaustedPartitions.add(range.partition());
@@ -130,7 +123,7 @@ public class KafkaOffsetGen {
long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents));
toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
}
ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset);
ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset);
}
}
}
@@ -143,29 +136,11 @@ public class KafkaOffsetGen {
}
}
/**
* Helpers to deal with tricky scala <=> java conversions. (oh my!)
*/
static class ScalaHelpers {
public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> m) {
return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.conforms());
}
public static Set<String> toScalaSet(HashSet<String> s) {
return JavaConverters.asScalaSetConverter(s).asScala().toSet();
}
public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
return JavaConverters.mapAsJavaMapConverter(m).asJava();
}
}
/**
* Kafka reset offset strategies.
*/
enum KafkaResetOffsetStrategies {
LARGEST, SMALLEST
LATEST, EARLIEST
}
/**
@@ -175,20 +150,20 @@ public class KafkaOffsetGen {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}
private final HashMap<String, String> kafkaParams;
private final HashMap<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
public KafkaOffsetGen(TypedProperties props) {
this.props = props;
kafkaParams = new HashMap<String, String>();
kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.getString(prop.toString()));
kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
@@ -197,31 +172,25 @@ public class KafkaOffsetGen {
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either =
cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
if (either.isLeft()) {
// log errors. and bail out.
throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head());
}
Set<TopicAndPartition> topicPartitions = either.right().get();
KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
List<PartitionInfo> partitionInfoList;
partitionInfoList = consumer.partitionsFor(topicName);
Set<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet());
// Determine the offset ranges to read from
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets;
Map<TopicPartition, Long> fromOffsets;
if (lastCheckpointStr.isPresent()) {
fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions);
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case SMALLEST:
fromOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
break;
case LARGEST:
fromOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
case LATEST:
fromOffsets = consumer.endOffsets(topicPartitions);
break;
default:
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
@@ -229,8 +198,7 @@ public class KafkaOffsetGen {
}
// Obtain the latest offsets.
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
@@ -245,15 +213,13 @@ public class KafkaOffsetGen {
// check up checkpoint offsets is valid or not, if true, return checkpoint offsets,
// else return earliest offsets
private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(KafkaCluster cluster,
Option<String> lastCheckpointStr, Set<TopicAndPartition> topicPartitions) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets =
new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue().offset() < earliestOffsets.get(offset.getKey()).offset());
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}
@@ -261,7 +227,7 @@ public class KafkaOffsetGen {
return topicName;
}
public HashMap<String, String> getKafkaParams() {
public HashMap<String, Object> getKafkaParams() {
return kafkaParams;
}
}

View File

@@ -28,14 +28,14 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.KafkaTestUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -44,6 +44,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -84,13 +85,12 @@ public class TestKafkaSource extends UtilitiesTestBase {
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.maxEventsFromKafkaSource));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
return props;
}
@@ -214,10 +214,10 @@ public class TestKafkaSource extends UtilitiesTestBase {
assertEquals(Option.empty(), fetch6.getBatch());
}
private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
private static HashMap<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicPartition, Long> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i]));
map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]);
}
return map;
}

View File

@@ -25,6 +25,6 @@ hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=impressions
#Kafka props
metadata.broker.list=localhost:9092
auto.offset.reset=smallest
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://localhost:8081