1
0

General enhancements

This commit is contained in:
arukavytsia
2018-12-12 03:19:43 +02:00
committed by vinoth chandar
parent 30c5f8b7bd
commit 6946dd7557
46 changed files with 402 additions and 373 deletions

View File

@@ -99,7 +99,7 @@ public class HoodieSnapshotCopier implements Serializable {
Path outputPath = new Path(outputDir);
if (fs.exists(outputPath)) {
logger.warn(
String.format("The output path %targetBasePath already exists, deleting", outputPath));
String.format("The output path %s targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(outputDir), true);
}
@@ -155,7 +155,7 @@ public class HoodieSnapshotCopier implements Serializable {
}
if (fs.exists(targetFilePath)) {
logger.error(String.format(
"The target output commit file (%targetBasePath) already exists.", targetFilePath));
"The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
}
@@ -166,7 +166,8 @@ public class HoodieSnapshotCopier implements Serializable {
// Create the _SUCCESS tag
Path successTagPath = new Path(outputDir + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
logger.info("Creating _SUCCESS under targetBasePath: " + outputDir);
logger.info(String.format(
"Creating _SUCCESS under targetBasePath: $s", outputDir));
fs.createNewFile(successTagPath);
}
}
@@ -175,7 +176,7 @@ public class HoodieSnapshotCopier implements Serializable {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, args);
logger.info(String.format("Snapshot hoodie table from %targetBasePath to %targetBasePath",
logger.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath",
cfg.basePath, cfg.outputPath));
// Create a spark job to do the snapshot copy

View File

@@ -95,14 +95,8 @@ public class UtilHelpers {
}
long len = fs.getFileStatus(p).getLen();
ByteBuffer buf = ByteBuffer.allocate((int) len);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(p);
try (FSDataInputStream inputStream = fs.open(p)) {
inputStream.readFully(0, buf.array(), 0, buf.array().length);
} finally {
if (inputStream != null) {
inputStream.close();
}
}
return new String(buf.array());
}

View File

@@ -28,6 +28,7 @@ import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.TimeZone;
import org.apache.avro.generic.GenericRecord;
@@ -73,7 +74,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
DataSourceUtils
.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
.checkRequiredProperties(config, Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat = new SimpleDateFormat(
config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));

View File

@@ -23,7 +23,8 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,7 +53,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), jssc.hadoopConfiguration());
try {
this.sourceSchema = new Schema.Parser().parse(

View File

@@ -25,7 +25,8 @@ import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
@@ -55,7 +56,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SCHEMA_REGISTRY_URL_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SCHEMA_REGISTRY_URL_PROP));
String registryUrl = props.getString(Config.SCHEMA_REGISTRY_URL_PROP);
try {
this.schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));

View File

@@ -87,7 +87,7 @@ public class AvroConvertor implements Serializable {
}
public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException {
public GenericRecord fromAvroBinary(byte[] avroBinary) {
initSchema();
initInjection();
return recordInjection.invert(avroBinary).get();

View File

@@ -39,9 +39,8 @@ public class AvroKafkaSource extends KafkaSource {
@Override
protected JavaRDD<GenericRecord> toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) {
JavaRDD<GenericRecord> recordRDD = KafkaUtils
return KafkaUtils
.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, kafkaParams,
offsetRanges).values().map(obj -> (GenericRecord) obj);
return recordRDD;
}
}

View File

@@ -26,10 +26,7 @@ import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
@@ -58,7 +55,7 @@ public abstract class DFSSource extends Source {
public DFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), sparkContext.hadoopConfiguration());
}
@@ -75,16 +72,14 @@ public abstract class DFSSource extends Source {
new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream().filter(
pfx -> fileStatus.getPath().getName().startsWith(pfx)).count() > 0) {
if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
}
eligibleFiles.add(fileStatus);
}
// sort them by modification time.
eligibleFiles.sort((FileStatus f1, FileStatus f2) -> Long.valueOf(f1.getModificationTime())
.compareTo(Long.valueOf(
f2.getModificationTime())));
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
@@ -110,8 +105,7 @@ public abstract class DFSSource extends Source {
// no data to read
if (filteredFiles.size() == 0) {
return new ImmutablePair<>(Optional.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get()
: String.valueOf(Long.MIN_VALUE));
lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
// read the files out.

View File

@@ -76,7 +76,7 @@ public class HiveIncrPullSource extends Source {
public HiveIncrPullSource(TypedProperties props, JavaSparkContext sparkContext,
SchemaProvider schemaProvider) {
super(props, sparkContext, schemaProvider);
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.incrPullRootPath = props.getString(Config.ROOT_INPUT_PATH_PROP);
this.fs = FSUtils.getFs(incrPullRootPath, sparkContext.hadoopConfiguration());
}
@@ -121,7 +121,7 @@ public class HiveIncrPullSource extends Source {
if (!commitToPull.isPresent()) {
return new ImmutablePair<>(Optional.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
lastCheckpointStr.orElse(""));
}
// read the files out.

View File

@@ -35,6 +35,6 @@ public class JsonDFSSource extends DFSSource {
@Override
protected JavaRDD<GenericRecord> fromFiles(AvroConvertor convertor, String pathStr) {
return sparkContext.textFile(pathStr).map((String j) -> convertor.fromJson(j));
return sparkContext.textFile(pathStr).map(convertor::fromJson);
}
}

View File

@@ -40,6 +40,6 @@ public class JsonKafkaSource extends KafkaSource {
protected JavaRDD<GenericRecord> toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) {
return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, offsetRanges)
.values().map(jsonStr -> avroConvertor.fromJson(jsonStr));
.values().map(avroConvertor::fromJson);
}
}

View File

@@ -25,11 +25,8 @@ import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
@@ -85,7 +82,7 @@ public abstract class KafkaSource extends Source {
*/
public static String offsetsToStr(OffsetRange[] ranges) {
StringBuilder sb = new StringBuilder();
// atleast 1 partition will be present.
// at least 1 partition will be present.
sb.append(ranges[0].topic() + ",");
sb.append(Arrays.stream(ranges)
.map(r -> String.format("%s:%d", r.partition(), r.untilOffset()))
@@ -106,8 +103,7 @@ public abstract class KafkaSource extends Source {
HashMap<TopicAndPartition, LeaderOffset> toOffsetMap,
long numEvents) {
Comparator<OffsetRange> byPartition = (OffsetRange o1, OffsetRange o2) ->
Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition()));
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
// Create initial offset ranges for each 'to' partition, with from = to offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
@@ -144,7 +140,7 @@ public abstract class KafkaSource extends Source {
}
public static long totalNewMessages(OffsetRange[] ranges) {
return Arrays.asList(ranges).stream().mapToLong(r -> r.count()).sum();
return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum();
}
}
@@ -166,13 +162,20 @@ public abstract class KafkaSource extends Source {
}
}
/**
* Kafka reset offset strategies
*/
enum KafkaResetOffsetStrategies {
LARGEST,
SMALLEST
}
/**
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
*/
static class Config {
private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String DEFAULT_AUTO_RESET_OFFSET = "largest";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
}
@@ -187,7 +190,7 @@ public abstract class KafkaSource extends Source {
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.getString(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.KAFKA_TOPIC_NAME));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
}
@@ -200,7 +203,7 @@ public abstract class KafkaSource extends Source {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = cluster.getPartitions(
ScalaHelpers.toScalaSet(new HashSet<>(Arrays.asList(topicName))));
ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName))));
if (either.isLeft()) {
// log errors. and bail out.
throw new HoodieDeltaStreamerException("Error obtaining partition metadata",
@@ -213,17 +216,20 @@ public abstract class KafkaSource extends Source {
if (lastCheckpointStr.isPresent()) {
fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
} else {
String autoResetValue = props
.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET);
if (autoResetValue.equals("smallest")) {
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(
cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
} else if (autoResetValue.equals("largest")) {
fromOffsets = new HashMap(
ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
} else {
throw new HoodieNotSupportedException(
"Auto reset value must be one of 'smallest' or 'largest' ");
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf(
props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case SMALLEST:
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(
cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
break;
case LARGEST:
fromOffsets = new HashMap(
ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
break;
default:
throw new HoodieNotSupportedException(
"Auto reset value must be one of 'smallest' or 'largest' ");
}
}
@@ -236,7 +242,7 @@ public abstract class KafkaSource extends Source {
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.orElse(""));
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName);
}