1
0

[HUDI-153] Use com.uber.hoodie.common.util.Option instead of Java and Guava Optional

This commit is contained in:
yanghua
2019-08-06 14:20:42 +08:00
committed by Balaji Varadarajan
parent d288e32833
commit 722b6be04a
128 changed files with 769 additions and 769 deletions

View File

@@ -32,6 +32,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
@@ -41,7 +42,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -134,7 +134,7 @@ public class HDFSParquetImporter implements Serializable {
.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Optional.empty(), props);
cfg.parallelism, Option.empty(), props);
JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
// Get instant time.

View File

@@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.utilities.exception.HoodieIncrementalPullException;
import com.uber.hoodie.utilities.exception.HoodieIncrementalPullSQLException;
@@ -34,7 +35,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Optional;
import java.util.Scanner;
import java.util.stream.Collectors;
import javax.sql.DataSource;
@@ -290,7 +290,7 @@ public class HiveIncrementalPuller {
}
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), targetDataPath);
Optional<HoodieInstant> lastCommit = metadata.getActiveTimeline().getCommitsTimeline()
Option<HoodieInstant> lastCommit = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (lastCommit.isPresent()) {
return lastCommit.get().getTimestamp();

View File

@@ -23,11 +23,11 @@ import com.beust.jcommander.Parameter;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -123,7 +123,7 @@ public class HoodieCompactor {
//Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.empty(), props);
Option.empty(), props);
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}
@@ -131,8 +131,8 @@ public class HoodieCompactor {
private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema.
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Optional.of(cfg.strategyClassName), props);
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
Option.of(cfg.strategyClassName), props);
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty());
return 0;
}
}

View File

@@ -30,11 +30,11 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -79,7 +79,7 @@ public class HoodieSnapshotCopier implements Serializable {
tableMetadata,
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
// Get the latest commit
Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline()
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");

View File

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
@@ -40,7 +41,6 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -185,7 +185,7 @@ public class UtilHelpers {
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism, Optional<String> compactionStrategyClass, TypedProperties properties)
String schemaStr, int parallelism, Option<String> compactionStrategyClass, TypedProperties properties)
throws Exception {
HoodieCompactionConfig compactionConfig =
compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)

View File

@@ -21,10 +21,10 @@ package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -57,6 +57,6 @@ public class Compactor implements Serializable {
+ "Errors :" + numWriteErrors);
}
// Commit compaction
compactionClient.commitCompaction(instant.getTimestamp(), res, Optional.empty());
compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
}
}

View File

@@ -34,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
@@ -55,7 +56,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -140,7 +140,7 @@ public class DeltaSync implements Serializable {
/**
* Timeline with completed commits
*/
private transient Optional<HoodieTimeline> commitTimelineOpt;
private transient Option<HoodieTimeline> commitTimelineOpt;
/**
* Write Client
@@ -192,10 +192,10 @@ public class DeltaSync implements Serializable {
private void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath);
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline()
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants());
} else {
this.commitTimelineOpt = Optional.empty();
this.commitTimelineOpt = Option.empty();
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
cfg.storageType, cfg.targetTableName, "archived");
}
@@ -204,8 +204,8 @@ public class DeltaSync implements Serializable {
/**
* Run one round of delta sync and return new compaction instant if one got scheduled
*/
public Optional<String> syncOnce() throws Exception {
Optional<String> scheduledCompaction = Optional.empty();
public Option<String> syncOnce() throws Exception {
Option<String> scheduledCompaction = Option.empty();
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();
@@ -238,18 +238,18 @@ public class DeltaSync implements Serializable {
* Read from Upstream Source and apply transformation if needed
*/
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
Optional<HoodieTimeline> commitTimelineOpt) throws Exception {
Option<HoodieTimeline> commitTimelineOpt) throws Exception {
// Retrieve the previous round checkpoints, if any
Optional<String> resumeCheckpointStr = Optional.empty();
Option<String> resumeCheckpointStr = Option.empty();
if (commitTimelineOpt.isPresent()) {
Optional<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
resumeCheckpointStr = Optional.of(cfg.checkpoint);
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
@@ -262,11 +262,11 @@ public class DeltaSync implements Serializable {
}
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
resumeCheckpointStr = Optional.of(cfg.checkpoint);
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
final SchemaProvider schemaProvider;
if (transformer != null) {
@@ -275,7 +275,7 @@ public class DeltaSync implements Serializable {
InputBatch<Dataset<Row>> dataAndCheckpoint = formatAdapter.fetchNewDataInRowFormat(
resumeCheckpointStr, cfg.sourceLimit);
Optional<Dataset<Row>> transformed =
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
avroRDDOptional = transformed.map(t ->
@@ -314,12 +314,12 @@ public class DeltaSync implements Serializable {
* @param records Input Records
* @param checkpointStr Checkpoint String
* @param metrics Metrics
* @return Optional Compaction instant if one is scheduled
* @return Option Compaction instant if one is scheduled
*/
private Optional<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
Optional<String> scheduledCompactionInstant = Optional.empty();
Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
if (cfg.filterDupes) {
@@ -330,7 +330,7 @@ public class DeltaSync implements Serializable {
if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return Optional.empty();
return Option.empty();
}
}
@@ -365,13 +365,13 @@ public class DeltaSync implements Serializable {
}
boolean success = writeClient.commit(commitTime, writeStatusRDD,
Optional.of(checkpointCommitMetadata));
Option.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
scheduledCompactionInstant = writeClient.scheduleCompaction(Optional.of(checkpointCommitMetadata));
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata));
}
// Sync to hive if enabled

View File

@@ -33,6 +33,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
@@ -46,7 +47,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -385,7 +385,7 @@ public class HoodieDeltaStreamer implements Serializable {
try {
while (!isShutdownRequested()) {
try {
Optional<String> scheduledCompactionInstant = deltaSync.syncOnce();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION,

View File

@@ -18,6 +18,7 @@
package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.util.Option;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
@@ -29,7 +30,6 @@ import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import scala.Option;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user
@@ -94,10 +94,14 @@ public class SchedulerConfGenerator {
* @param cfg Config
*/
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
final Option<String> sparkSchedulerMode = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
scala.Option<String> scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
final Option<String> sparkSchedulerMode =
scheduleModeKeyOption.isDefined()
? Option.of(scheduleModeKeyOption.get())
: Option.empty();
Map<String, String> additionalSparkConfigs = new HashMap<>();
if (sparkSchedulerMode.isDefined() && "FAIR".equals(sparkSchedulerMode.get())
if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get())
&& cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);

View File

@@ -22,13 +22,13 @@ import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import com.uber.hoodie.AvroConversionUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.utilities.sources.AvroSource;
import com.uber.hoodie.utilities.sources.InputBatch;
import com.uber.hoodie.utilities.sources.JsonSource;
import com.uber.hoodie.utilities.sources.RowSource;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.helpers.AvroConvertor;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -55,7 +55,7 @@ public final class SourceFormatAdapter {
* @param sourceLimit
* @return
*/
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Optional<String> lastCkptStr,
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr,
long sourceLimit) {
switch (source.getSourceType()) {
case AVRO:
@@ -63,13 +63,13 @@ public final class SourceFormatAdapter {
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
return new InputBatch<>(Optional.ofNullable(
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> rdd.map(convertor::fromJson))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource)source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Optional.ofNullable(r.getBatch().map(
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
@@ -85,14 +85,14 @@ public final class SourceFormatAdapter {
* @param sourceLimit
* @return
*/
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Optional<String> lastCkptStr, long sourceLimit) {
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
case ROW:
return ((RowSource)source).fetchNext(lastCkptStr, sourceLimit);
case AVRO: {
InputBatch<JavaRDD<GenericRecord>> r = ((AvroSource)source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(Optional.ofNullable(
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd),
sourceSchema.toString(), source.getSparkSession()))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
@@ -101,7 +101,7 @@ public final class SourceFormatAdapter {
InputBatch<JavaRDD<String>> r = ((JsonSource)source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
return new InputBatch<>(Optional.ofNullable(
return new InputBatch<>(Option.ofNullable(
r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}

View File

@@ -18,11 +18,11 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.DFSPathSelector;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
@@ -46,14 +46,14 @@ public class AvroDFSSource extends AvroSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCkptStr,
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr,
long sourceLimit) {
Pair<Optional<String>, String> selectPathsWithMaxModificationTime =
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft().map(pathStr -> new InputBatch<>(
Optional.of(fromFiles(pathStr)),
Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Optional.empty(), selectPathsWithMaxModificationTime.getRight()));
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
}
private JavaRDD<GenericRecord> fromFiles(String pathStr) {

View File

@@ -18,12 +18,12 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen;
import com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import java.util.Optional;
import kafka.serializer.StringDecoder;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
@@ -50,18 +50,18 @@ public class AvroKafkaSource extends AvroSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCheckpointStr,
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new InputBatch<>(Optional.empty(),
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Optional.of(newDataRDD),
return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}

View File

@@ -20,6 +20,7 @@ package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
@@ -28,7 +29,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
@@ -83,7 +83,7 @@ public class HiveIncrPullSource extends AvroSource {
/**
* Finds the first commit from source, greater than the target's last commit, and reads it out.
*/
private Optional<String> findCommitToPull(Optional<String> latestTargetCommit)
private Option<String> findCommitToPull(Option<String> latestTargetCommit)
throws IOException {
log.info("Looking for commits ");
@@ -99,27 +99,27 @@ public class HiveIncrPullSource extends AvroSource {
if (!latestTargetCommit.isPresent()) {
// start from the beginning
return Optional.of(commitTimes.get(0));
return Option.of(commitTimes.get(0));
}
for (String commitTime : commitTimes) {
//TODO(vc): Add an option to delete consumed commits
if (commitTime.compareTo(latestTargetCommit.get()) > 0) {
return Optional.of(commitTime);
return Option.of(commitTime);
}
}
return Optional.empty();
return Option.empty();
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(
Optional<String> lastCheckpointStr, long sourceLimit) {
Option<String> lastCheckpointStr, long sourceLimit) {
try {
// find the source commit to pull
Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);
Option<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new InputBatch<>(Optional.empty(),
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
@@ -131,7 +131,7 @@ public class HiveIncrPullSource extends AvroSource {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
return new InputBatch<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException(

View File

@@ -21,13 +21,13 @@ package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.DataSourceReadOptions;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.hive.SlashEncodedDayPartitionValueExtractor;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.IncrSourceHelper;
import java.util.Arrays;
import java.util.Optional;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
@@ -79,7 +79,7 @@ public class HoodieIncrSource extends RowSource {
}
@Override
public Pair<Optional<Dataset<Row>>, String> fetchNextBatch(Optional<String> lastCkptStr, long sourceLimit) {
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH));
@@ -97,15 +97,15 @@ public class HoodieIncrSource extends RowSource {
Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
// Use begin Instant if set and non-empty
Optional<String> beginInstant =
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Optional.empty() : lastCkptStr : Optional.empty();
Option<String> beginInstant =
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty();
Pair<String, String> instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath,
numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
log.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
return Pair.of(Optional.empty(), instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
}
// Do Incr pull. Set end instant if available
@@ -153,6 +153,6 @@ public class HoodieIncrSource extends RowSource {
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream()
.filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new));
//log.info("Final Schema from Source is :" + src.schema());
return Pair.of(Optional.of(src), instantEndpts.getRight());
return Pair.of(Option.of(src), instantEndpts.getRight());
}
}

View File

@@ -18,29 +18,29 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.util.Optional;
public class InputBatch<T> {
private final Optional<T> batch;
private final Option<T> batch;
private final String checkpointForNextBatch;
private final SchemaProvider schemaProvider;
public InputBatch(Optional<T> batch, String checkpointForNextBatch,
public InputBatch(Option<T> batch, String checkpointForNextBatch,
SchemaProvider schemaProvider) {
this.batch = batch;
this.checkpointForNextBatch = checkpointForNextBatch;
this.schemaProvider = schemaProvider;
}
public InputBatch(Optional<T> batch, String checkpointForNextBatch) {
public InputBatch(Option<T> batch, String checkpointForNextBatch) {
this.batch = batch;
this.checkpointForNextBatch = checkpointForNextBatch;
this.schemaProvider = null;
}
public Optional<T> getBatch() {
public Option<T> getBatch() {
return batch;
}

View File

@@ -18,11 +18,11 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.DFSPathSelector;
import java.util.Optional;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -41,13 +41,13 @@ public class JsonDFSSource extends JsonSource {
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Optional<String> lastCkptStr,
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr,
long sourceLimit) {
Pair<Optional<String>, String> selPathsWithMaxModificationTime =
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selPathsWithMaxModificationTime.getLeft().map(pathStr -> new InputBatch<>(
Optional.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight()))
.orElse(new InputBatch<>(Optional.empty(), selPathsWithMaxModificationTime.getRight()));
Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight()))
.orElse(new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight()));
}
private JavaRDD<String> fromFiles(String pathStr) {

View File

@@ -18,11 +18,11 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen;
import com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import java.util.Optional;
import kafka.serializer.StringDecoder;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -48,17 +48,17 @@ public class JsonKafkaSource extends JsonSource {
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Optional<String> lastCheckpointStr,
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new InputBatch<>(Optional.empty(),
return new InputBatch<>(Option.empty(),
lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {

View File

@@ -18,11 +18,11 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.util.Optional;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -37,12 +37,12 @@ public abstract class RowSource extends Source<Dataset<Row>> {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
}
protected abstract Pair<Optional<Dataset<Row>>, String> fetchNextBatch(Optional<String> lastCkptStr,
protected abstract Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr,
long sourceLimit);
@Override
protected final InputBatch<Dataset<Row>> fetchNewData(Optional<String> lastCkptStr, long sourceLimit) {
Pair<Optional<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, sourceLimit);
protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, sourceLimit);
return res.getKey().map(dsr -> {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(dsr.schema());
return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider);

View File

@@ -18,10 +18,10 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.Serializable;
import java.util.Optional;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -60,7 +60,7 @@ public abstract class Source<T> implements Serializable {
this.sourceType = sourceType;
}
protected abstract InputBatch<T> fetchNewData(Optional<String> lastCkptStr, long sourceLimit);
protected abstract InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sourceLimit);
/**
* Main API called by Hoodie Delta Streamer to fetch records
@@ -68,7 +68,7 @@ public abstract class Source<T> implements Serializable {
* @param sourceLimit Source Limit
* @return
*/
public final InputBatch<T> fetchNext(Optional<String> lastCkptStr, long sourceLimit) {
public final InputBatch<T> fetchNext(Option<String> lastCkptStr, long sourceLimit) {
InputBatch<T> batch = fetchNewData(lastCkptStr, sourceLimit);
// If overriddenSchemaProvider is passed in CLI, use it
return overriddenSchemaProvider == null ? batch : new InputBatch<>(batch.getBatch(),

View File

@@ -20,6 +20,7 @@ package com.uber.hoodie.utilities.sources.helpers;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.ImmutablePair;
import com.uber.hoodie.common.util.collection.Pair;
@@ -55,8 +56,8 @@ public class DFSPathSelector {
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
public Pair<Optional<String>, String> getNextFilePathsAndMaxModificationTime(
Optional<String> lastCheckpointStr, long sourceLimit) {
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
Option<String> lastCheckpointStr, long sourceLimit) {
try {
// obtain all eligible files under root folder.
@@ -97,7 +98,7 @@ public class DFSPathSelector {
// no data to read
if (filteredFiles.size() == 0) {
return new ImmutablePair<>(Optional.empty(),
return new ImmutablePair<>(Option.empty(),
lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}
@@ -106,7 +107,7 @@ public class DFSPathSelector {
.collect(Collectors.joining(","));
return new ImmutablePair<>(
Optional.ofNullable(pathStr),
Option.ofNullable(pathStr),
String.valueOf(maxModificationTime));
} catch (IOException ioe) {
throw new HoodieIOException(

View File

@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.collection.Pair;
import java.util.Optional;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
@@ -52,7 +52,7 @@ public class IncrSourceHelper {
* @return begin and end instants
*/
public static Pair<String, String> calculateBeginAndEndInstants(
JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Optional<String> beginInstant,
JavaSparkContext jssc, String srcBasePath, int numInstantsPerFetch, Option<String> beginInstant,
boolean readLatestOnMissingBeginInstant) {
Preconditions.checkArgument(numInstantsPerFetch > 0, "Make sure the config"
+ " hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
@@ -64,7 +64,7 @@ public class IncrSourceHelper {
String beginInstantTime = beginInstant.orElseGet(() -> {
if (readLatestOnMissingBeginInstant) {
Optional<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000");
} else {
throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest "
@@ -72,8 +72,11 @@ public class IncrSourceHelper {
}
});
Optional<HoodieInstant> nthInstant =
activeCommitTimeline.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y);
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(
activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch)
.getInstants()
.reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(instant -> instant.getTimestamp()).orElse(beginInstantTime));
}

View File

@@ -19,6 +19,7 @@
package com.uber.hoodie.utilities.sources.helpers;
import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
@@ -27,7 +28,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Collectors;
import kafka.common.TopicAndPartition;
import org.apache.log4j.LogManager;
@@ -189,7 +189,7 @@ public class KafkaOffsetGen {
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
}
public OffsetRange[] getNextOffsetRanges(Optional<String> lastCheckpointStr, long sourceLimit) {
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
@@ -240,7 +240,7 @@ public class KafkaOffsetGen {
// else return earliest offsets
private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets(
KafkaCluster cluster,
Optional<String> lastCheckpointStr,
Option<String> lastCheckpointStr,
Set<TopicAndPartition> topicPartitions) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets =
CheckpointUtils.strToOffsets(lastCheckpointStr.get());

View File

@@ -30,6 +30,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
@@ -49,7 +50,6 @@ import com.uber.hoodie.utilities.transform.Transformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -501,7 +501,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, "true");
DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props,
jsc, sparkSession, null);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Optional.empty(), 10000000);
InputBatch<JavaRDD<GenericRecord>> batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000);
batch.getBatch().get().cache();
long c = batch.getBatch().get().count();
Assert.assertEquals(1000, c);

View File

@@ -2,6 +2,7 @@ package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.common.util.collection.RocksDBBasedMap;
import com.uber.hoodie.exception.HoodieIOException;
@@ -11,7 +12,6 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -96,7 +96,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Optional<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;

View File

@@ -18,11 +18,11 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
@@ -46,14 +46,14 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCkptStr, long sourceLimit) {
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
String commitTime = String.format("%05d", nextCommitNum);
log.info("Source Limit is set to " + sourceLimit);
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Optional.empty(), commitTime);
return new InputBatch<>(Option.empty(), commitTime);
}
TypedProperties newProps = new TypedProperties();
@@ -74,6 +74,6 @@ public class DistributedTestDataSource extends AbstractBaseTestSource {
Iterator<GenericRecord> itr = fetchNextBatch(newProps, perPartitionSourceLimit, commitTime, p).iterator();
return itr;
}, true);
return new InputBatch<>(Optional.of(avroRDD), commitTime);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}
}

View File

@@ -22,12 +22,12 @@ import static org.junit.Assert.assertEquals;
import com.uber.hoodie.AvroConversionUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.UtilitiesTestBase;
import com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
@@ -79,17 +79,17 @@ public class TestDFSSource extends UtilitiesTestBase {
SourceFormatAdapter jsonSource = new SourceFormatAdapter(jsonDFSSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Optional.empty(), jsonSource.fetchNewDataInAvroFormat(Optional.empty(), Long.MAX_VALUE).getBatch());
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Optional.empty(), jsonSource.fetchNewDataInAvroFormat(Optional.empty(), 10).getBatch());
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
InputBatch<JavaRDD<GenericRecord>> fetch1 =
jsonSource.fetchNewDataInAvroFormat(Optional.empty(), 1000000);
jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
InputBatch<Dataset<Row>> fetch1AsRows =
jsonSource.fetchNewDataInRowFormat(Optional.empty(), 1000000);
jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
@@ -101,12 +101,12 @@ public class TestDFSSource extends UtilitiesTestBase {
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)),
dfs, dfsBasePath + "/jsonFiles/2.json");
InputBatch<Dataset<Row>> fetch2 = jsonSource.fetchNewDataInRowFormat(
Optional.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<Dataset<Row>> fetch3 = jsonSource.fetchNewDataInRowFormat(
Optional.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
fetch3.getBatch().get().registerTempTable("test_dfs_table");
@@ -115,7 +115,7 @@ public class TestDFSSource extends UtilitiesTestBase {
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = jsonSource.fetchNewDataInAvroFormat(
Optional.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Optional.empty(), fetch4.getBatch());
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
}
}

View File

@@ -18,10 +18,10 @@
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
@@ -44,7 +44,7 @@ public class TestDataSource extends AbstractBaseTestSource {
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastCheckpointStr,
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr,
long sourceLimit) {
int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0);
@@ -53,12 +53,12 @@ public class TestDataSource extends AbstractBaseTestSource {
// No new data.
if (sourceLimit <= 0) {
return new InputBatch<>(Optional.empty(), commitTime);
return new InputBatch<>(Option.empty(), commitTime);
}
List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, commitTime, DEFAULT_PARTITION_NUM)
.collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD = sparkContext.<GenericRecord>parallelize(records, 4);
return new InputBatch<>(Optional.of(avroRDD), commitTime);
return new InputBatch<>(Option.of(avroRDD), commitTime);
}
}

View File

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import com.uber.hoodie.AvroConversionUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.UtilitiesTestBase;
import com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter;
@@ -29,7 +30,6 @@ import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Optional;
import kafka.common.TopicAndPartition;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
@@ -96,9 +96,9 @@ public class TestKafkaSource extends UtilitiesTestBase {
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Optional.empty(), kafkaSource.fetchNewDataInAvroFormat(Optional.empty(), Long.MAX_VALUE).getBatch());
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Optional.empty(), 900);
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// Test Avro To DataFrame<Row> path
Dataset<Row> fetch1AsRows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
@@ -108,28 +108,28 @@ public class TestKafkaSource extends UtilitiesTestBase {
// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 = kafkaSource.fetchNewDataInRowFormat(
Optional.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count());
// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch3 = kafkaSource.fetchNewDataInAvroFormat(
Optional.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch3AsRows =
kafkaSource.fetchNewDataInRowFormat(Optional.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch3AsRows.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 = kafkaSource.fetchNewDataInAvroFormat(
Optional.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Optional.empty(), fetch4.getBatch());
Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());
// Same using Row API
InputBatch<Dataset<Row>> fetch4AsRows =
kafkaSource.fetchNewDataInRowFormat(Optional.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Optional.empty(), fetch4AsRows.getBatch());
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}