1
0

Revamped Deltastreamer (#93)

* Add analytics to site

* Fix ugly favicon

* New & Improved HoodieDeltaStreamer

 - Can incrementally consume from HDFS or Kafka, with exactly-once semantics!
 - Supports Json/Avro data, Source can also do custom things
 - Source is totally pluggable, via reflection
 - Key generation is pluggable, currently added SimpleKeyGenerator
 - Schema provider is pluggable, currently Filebased schemas
 - Configurable field to break ties during preCombine
 - Finally, can also plugin the HoodieRecordPayload, to get other merge types than overwriting
 - Handles efficient avro serialization in Spark

 Pending :
 - Rewriting of HiveIncrPullSource
 - Hive sync via hoodie-hive
 - Cleanup & tests

* Minor fixes from master rebase

* Implementation of HiveIncrPullSource
 - Copies commit by commit from source to target

* Adding TimestampBasedKeyGenerator
 - Supports unix time & date strings
This commit is contained in:
vinoth chandar
2017-03-13 12:41:29 -07:00
committed by prazanna
parent c3257b9680
commit 69d3950a32
33 changed files with 1925 additions and 263 deletions

View File

@@ -64,6 +64,8 @@ import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -317,6 +319,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* Commit changes performed at the given commitTime marker
*/
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
return commit(commitTime, writeStatuses, Optional.empty());
}
/**
* Commit changes performed at the given commitTime marker
*/
public boolean commit(String commitTime,
JavaRDD<WriteStatus> writeStatuses,
Optional<HashMap<String, String>> extraMetadata) {
logger.info("Comitting " + commitTime);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable
@@ -333,6 +345,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
}
// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v));
}
try {
String actionType = table.getCommitActionType();

View File

@@ -27,6 +27,7 @@ import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
@@ -230,6 +231,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
}
public Builder fromInputStream(InputStream inputStream) throws IOException {
try {
this.props.load(inputStream);
return this;
} finally {
inputStream.close();
}
}
public Builder withPath(String basePath) {
props.setProperty(BASE_PATH_PROP, basePath);

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.avro;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

View File

@@ -29,6 +29,7 @@ import java.io.IOException;
/**
* This is a payload to wrap a existing Hoodie Avro Record.
* Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions)
*
*/
public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> {
private final Optional<GenericRecord> record;

View File

@@ -16,6 +16,8 @@
package com.uber.hoodie.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.codehaus.jackson.annotate.JsonAutoDetect;
@@ -33,21 +35,29 @@ import java.util.Map;
/**
* All the metadata that gets stored along with a commit.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class HoodieCommitMetadata implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class);
private HashMap<String, List<HoodieWriteStat>> partitionToWriteStats;
private HashMap<String, String> extraMetadataMap;
public HoodieCommitMetadata() {
extraMetadataMap = new HashMap<>();
partitionToWriteStats = new HashMap<>();
}
public void addWriteStat(String partitionPath, HoodieWriteStat stat) {
if (!partitionToWriteStats.containsKey(partitionPath)) {
partitionToWriteStats.put(partitionPath, new ArrayList<HoodieWriteStat>());
partitionToWriteStats.put(partitionPath, new ArrayList<>());
}
partitionToWriteStats.get(partitionPath).add(stat);
}
public void addMetadata(String metaKey, String value) {
extraMetadataMap.put(metaKey, value);
}
public List<HoodieWriteStat> getWriteStats(String partitionPath) {
return partitionToWriteStats.get(partitionPath);
}
@@ -56,6 +66,10 @@ public class HoodieCommitMetadata implements Serializable {
return partitionToWriteStats;
}
public String getMetadata(String metaKey) {
return extraMetadataMap.get(metaKey);
}
public HashMap<String, String> getFileIdAndFullPaths() {
HashMap<String, String> filePaths = new HashMap<>();
// list all partitions paths

View File

@@ -25,12 +25,21 @@
<artifactId>hoodie-utilities</artifactId>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
@@ -41,7 +50,7 @@
</descriptors>
<archive>
<manifest>
<mainClass>com.uber.hoodie.utilities.HoodieDeltaStreamer</mainClass>
<mainClass>com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer</mainClass>
</manifest>
</archive>
@@ -57,7 +66,6 @@
</execution>
</executions>
</plugin>
</plugins>
<resources>
@@ -220,6 +228,19 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Used for SQL templating -->
<dependency>
<groupId>org.antlr</groupId>
@@ -238,7 +259,27 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.10</artifactId>
<version>0.9.2</version>
</dependency>
</dependencies>

View File

@@ -1,17 +1,19 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
@@ -78,7 +80,7 @@ public class HiveIncrementalPuller {
@Parameter(names = {"--fromCommitTime"}) public String fromCommitTime;
@Parameter(names = {"--maxCommits"}) public int maxCommits = 3;
@Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false;
@Parameter(names = {"--storageFormat"}) public String tempTableStorageFormat = "PARQUET";
@Parameter(names = {"--storageFormat"}) public String tempTableStorageFormat = "AVRO";
}
static {
@@ -169,11 +171,11 @@ public class HiveIncrementalPuller {
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
}
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) {
log.info("Incremental SQL : " + incrementalSQL
+ " does not contain `_hoodie_commit_time` > '%s'. Please add this clause for incremental to work properly.");
+ " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add this clause for incremental to work properly.");
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally");
"Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which means its not pulling incrementally");
}
incrementalPullSQLtemplate

View File

@@ -1,230 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.utilities;
import com.google.common.io.Files;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieJsonPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset.
* Does not maintain any state, queries at runtime to see how far behind the target dataset is from
* the source dataset. This can be overriden to force sync from a timestamp.
*/
public class HoodieDeltaStreamer implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
private final Config cfg;
public HoodieDeltaStreamer(Config cfg) throws IOException {
this.cfg = cfg;
}
private void sync() throws Exception {
JavaSparkContext sc = getSparkContext(cfg);
FileSystem fs = FSUtils.getFs();
HoodieTableMetaClient targetHoodieMetadata = new HoodieTableMetaClient(fs, cfg.targetPath);
HoodieTimeline timeline = targetHoodieMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
String lastCommitPulled = findLastCommitPulled(fs, cfg.dataPath);
log.info("Last commit pulled on the source dataset is " + lastCommitPulled);
if (!timeline.getInstants().iterator().hasNext() && timeline
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), lastCommitPulled,
HoodieTimeline.GREATER)) {
// this should never be the case
throw new IllegalStateException(
"Last commit pulled from source table " + lastCommitPulled
+ " is before the last commit in the target table " + timeline.lastInstant()
.get());
}
if (!cfg.override && timeline.containsOrBeforeTimelineStarts(lastCommitPulled)) {
throw new IllegalStateException(
"Target Table already has the commit " + lastCommitPulled
+ ". Not overriding as cfg.override is false");
}
syncTill(lastCommitPulled, targetHoodieMetadata, sc);
}
private String findLastCommitPulled(FileSystem fs, String dataPath) throws IOException {
FileStatus[] commitTimePaths = fs.listStatus(new Path(dataPath));
List<String> commitTimes = new ArrayList<>(commitTimePaths.length);
for (FileStatus commitTimePath : commitTimePaths) {
String[] splits = commitTimePath.getPath().toString().split("/");
commitTimes.add(splits[splits.length - 1]);
}
Collections.sort(commitTimes);
Collections.reverse(commitTimes);
log.info("Retrieved commit times " + commitTimes);
return commitTimes.get(0);
}
private void syncTill(String lastCommitPulled, HoodieTableMetaClient target,
JavaSparkContext sc) throws Exception {
// Step 1 : Scan incrementally and get the input records as a RDD of source format
String dataPath = cfg.dataPath + "/" + lastCommitPulled;
log.info("Using data path " + dataPath);
JavaRDD<String> rdd = sc.textFile(dataPath);
// Step 2 : Create the hoodie records
JavaRDD<HoodieRecord<HoodieJsonPayload>> records =
rdd.map(new Function<String, HoodieRecord<HoodieJsonPayload>>() {
@Override
public HoodieRecord<HoodieJsonPayload> call(String json)
throws Exception {
HoodieJsonPayload payload = new HoodieJsonPayload(json);
HoodieKey key = new HoodieKey(payload.getRowKey(cfg.keyColumnField),
payload.getPartitionPath(cfg.partitionPathField));
return new HoodieRecord<>(key, payload);
}
});
// Step 3: Use Hoodie Client to upsert/bulk load the records into target hoodie dataset
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(target);
HoodieWriteClient<HoodieJsonPayload> client = new HoodieWriteClient<>(sc, hoodieCfg);
log.info("Rollback started " + lastCommitPulled);
client.rollback(lastCommitPulled);
client.startCommitWithTime(lastCommitPulled);
log.info("Starting commit " + lastCommitPulled);
if (cfg.upsert) {
log.info("Upserting records");
client.upsert(records, lastCommitPulled);
} else {
log.info("Inserting records");
// insert the records.
client.insert(records, lastCommitPulled);
}
// TODO - revisit this - can we clean this up.
// determine if this write should be committed.
// final Accumulator<Integer> errorCount = sc.intAccumulator(0);
// final Accumulator<Integer> totalCount = sc.intAccumulator(0);
// statuses.foreach(new VoidFunction<WriteStatus>() {
// @Override public void call(WriteStatus status) throws Exception {
// if (status.hasGlobalError()) {
// log.error(status.getGlobalError());
// errorCount.add(1);
// }
// if (status.hasErrors()) {
// log.info(status);
// for (Map.Entry<HoodieKey, Throwable> keyErrEntry : status.getErrors()
// .entrySet()) {
// log.error(String.format("\t %s error %s", keyErrEntry.getKey(),
// keyErrEntry.getValue().getMessage()), keyErrEntry.getValue());
// }
// }
// errorCount.add(status.getErrors().size());
// totalCount.add(status.getWrittenRecords().size());
// }
// })
}
private HoodieWriteConfig getHoodieClientConfig(HoodieTableMetaClient metadata)
throws Exception {
final String schemaStr = Files.toString(new File(cfg.schemaFile), Charset.forName("UTF-8"));
return HoodieWriteConfig.newBuilder().withPath(metadata.getBasePath())
.withSchema(schemaStr)
.withParallelism(cfg.groupByParallelism, cfg.groupByParallelism)
.forTable(metadata.getTableConfig().getTableName()).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.build();
}
private JavaSparkContext getSparkContext(Config cfg) {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-delta-streamer-" + cfg.targetTableName);
sparkConf.setMaster(cfg.sparkMaster);
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.driver.maxResultSize", "2g");
if (cfg.sparkMaster.startsWith("yarn")) {
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
}
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(sparkConf);
}
public static class Config implements Serializable {
@Parameter(names = {"--dataPath"})
public String dataPath;
@Parameter(names = {"--parallelism"})
public int groupByParallelism = 10000;
@Parameter(names = {"--upsert"})
public boolean upsert = false;
@Parameter(names = {"--master"})
public String sparkMaster = "yarn-client";
@Parameter(names = {"--targetPath"}, required = true)
public String targetPath;
@Parameter(names = {"--targetTable"})
public String targetTableName;
@Parameter(names = {"--keyColumn"})
public String keyColumnField = "uuid";
@Parameter(names = {"--partitionPathField"})
public String partitionPathField = "request_at";
@Parameter(names = {"--schemaFile"})
public String schemaFile;
@Parameter(names = {"--override"})
public boolean override = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
new HoodieDeltaStreamer(cfg).sync();
}
}

View File

@@ -1,17 +1,19 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
@@ -74,7 +76,7 @@ public class HoodieSnapshotCopier implements Serializable {
if(!latestCommit.isPresent()) {
logger.warn("No commits present. Nothing to snapshot");
} else {
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommit.get()));
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %targetBasePath.", latestCommit.get()));
}
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir);
@@ -84,7 +86,7 @@ public class HoodieSnapshotCopier implements Serializable {
// Make sure the output directory is empty
Path outputPath = new Path(outputDir);
if (fs.exists(outputPath)) {
logger.warn(String.format("The output path %s already exists, deleting", outputPath));
logger.warn(String.format("The output path %targetBasePath already exists, deleting", outputPath));
fs.delete(new Path(outputDir), true);
}
@@ -118,7 +120,7 @@ public class HoodieSnapshotCopier implements Serializable {
});
// Also copy the .commit files
logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommit.get()));
logger.info(String.format("Copying .commit files which are no-late-than %targetBasePath.", latestCommit.get()));
FileStatus[] commitFilesToCopy = fs.listStatus(
new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter() {
@Override
@@ -141,7 +143,7 @@ public class HoodieSnapshotCopier implements Serializable {
fs.mkdirs(targetFilePath.getParent());
}
if (fs.exists(targetFilePath)) {
logger.error(String.format("The target output commit file (%s) already exists.", targetFilePath));
logger.error(String.format("The target output commit file (%targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
}
@@ -152,7 +154,7 @@ public class HoodieSnapshotCopier implements Serializable {
// Create the _SUCCESS tag
Path successTagPath = new Path(outputDir + "/_SUCCESS");
if (!fs.exists(successTagPath)) {
logger.info(String.format("Creating _SUCCESS under %s.", outputDir));
logger.info("Creating _SUCCESS under targetBasePath: " + outputDir);
fs.createNewFile(successTagPath);
}
}
@@ -161,7 +163,7 @@ public class HoodieSnapshotCopier implements Serializable {
// Take input configs
final Config cfg = new Config();
new JCommander(cfg, args);
logger.info(String.format("Snapshot hoodie table from %s to %s", cfg.basePath, cfg.outputPath));
logger.info(String.format("Snapshot hoodie table from %targetBasePath to %targetBasePath", cfg.basePath, cfg.outputPath));
// Create a spark job to do the snapshot copy
SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");

View File

@@ -0,0 +1,108 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.keygen.KeyGenerator;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
/**
* Bunch of helper methods
*/
public class UtilHelpers {
public static Source createSource(String sourceClass, PropertiesConfiguration cfg, JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider) throws IOException {
try {
return (Source) ConstructorUtils.invokeConstructor(Class.forName(sourceClass), (Object) cfg, (Object) jssc, (Object) dataFormat, (Object) schemaProvider);
} catch (Throwable e) {
throw new IOException("Could not load source class " + sourceClass, e);
}
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass, PropertiesConfiguration cfg) throws IOException {
try {
return (SchemaProvider) ConstructorUtils.invokeConstructor(Class.forName(schemaProviderClass), (Object) cfg);
} catch (Throwable e) {
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
}
}
public static KeyGenerator createKeyGenerator(String keyGeneratorClass, PropertiesConfiguration cfg) throws IOException {
try {
return (KeyGenerator) ConstructorUtils.invokeConstructor(Class.forName(keyGeneratorClass), (Object) cfg);
} catch (Throwable e) {
throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
}
}
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException {
try {
return (HoodieRecordPayload) ConstructorUtils.invokeConstructor(Class.forName(payloadClass), (Object) record, (Object) orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
}
/**
*
* TODO: Support hierarchical config files (see CONFIGURATION-609 for sample)
*
* @param fs
* @param cfgPath
* @return
*/
public static PropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) {
try {
FSDataInputStream in = fs.open(cfgPath);
PropertiesConfiguration config = new PropertiesConfiguration();
config.load(in);
in.close();
return config;
} catch (IOException e) {
throw new HoodieIOException("Unable to read config file at :" + cfgPath, e);
} catch (ConfigurationException e) {
throw new HoodieDeltaStreamerException("Invalid configs found in config file at :" + cfgPath, e);
}
}
public static void checkRequiredProperties(PropertiesConfiguration configuration, List<String> checkPropNames) {
checkPropNames.stream().forEach(prop -> {
if (!configuration.containsKey(prop)) {
throw new HoodieNotSupportedException("Required property "+ prop + " is missing");
}
});
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.Optional;
/**
* Default payload used for delta streamer.
*
* 1. preCombine - Picks the latest delta record for a key, based on an ordering field
* 2. combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record
*/
public class DeltaStreamerAvroPayload extends DeltaStreamerPayload implements HoodieRecordPayload<DeltaStreamerAvroPayload> {
/**
*
* @param record
* @param orderingVal
*/
public DeltaStreamerAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
@Override
public DeltaStreamerAvroPayload preCombine(DeltaStreamerAvroPayload another) {
if (another.orderingVal.compareTo(orderingVal) > 0) {
return another;
} else {
return this;
}
}
@Override
public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
// combining strategy here trivially ignores currentValue on disk and writes this record
return getInsertValue(schema);
}
@Override
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return Optional.of(HoodieAvroUtils.rewriteRecord(record, schema));
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.deltastreamer;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
/**
* Base class for all payload supported for the {@link HoodieDeltaStreamer}
*/
public abstract class DeltaStreamerPayload implements Serializable {
/**
* Avro data extracted from the source
*/
protected final GenericRecord record;
/**
* For purposes of preCombining
*/
protected final Comparable orderingVal;
/**
*
* @param record
* @param orderingVal
*/
public DeltaStreamerPayload(GenericRecord record, Comparable orderingVal) {
this.record = record;
this.orderingVal = orderingVal;
}
}

View File

@@ -0,0 +1,359 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.deltastreamer;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.HiveIncrementalPuller;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.keygen.SimpleKeyGenerator;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.sources.DFSSource;
import com.uber.hoodie.utilities.keygen.KeyGenerator;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import scala.collection.JavaConversions;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset.
* Does not maintain any state, queries at runtime to see how far behind the target dataset is from
* the source dataset. This can be overriden to force sync from a timestamp.
*/
public class HoodieDeltaStreamer implements Serializable {
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
private static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
private final Config cfg;
/**
* Source to pull deltas from
*/
private transient Source source;
/**
* Schema provider that supplies the command for reading the input and writing out the
* target table.
*/
private transient SchemaProvider schemaProvider;
/**
* Extract the key for the target dataset
*/
private KeyGenerator keyGenerator;
/**
* Filesystem used
*/
private transient FileSystem fs;
/**
* Timeline with completed commits
*/
private transient Optional<HoodieTimeline> commitTimelineOpt;
/**
* Spark context
*/
private transient JavaSparkContext jssc;
public HoodieDeltaStreamer(Config cfg) throws IOException {
this.cfg = cfg;
this.fs = FSUtils.getFs();
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath);
this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
} else {
this.commitTimelineOpt = Optional.empty();
}
//TODO(vc) Should these be passed from outside?
initSchemaProvider();
initKeyGenerator();
this.jssc = getSparkContext();
initSource();
}
private void initSource() throws IOException {
// Create the source & schema providers
PropertiesConfiguration sourceCfg = UtilHelpers.readConfig(fs, new Path(cfg.sourceConfigProps));
log.info("Creating source " + cfg.sourceClassName + " with configs : " + sourceCfg.toString());
this.source = UtilHelpers.createSource(cfg.sourceClassName, sourceCfg, jssc, cfg.sourceFormat, schemaProvider);
}
private void initSchemaProvider() throws IOException {
PropertiesConfiguration schemaCfg = UtilHelpers.readConfig(fs, new Path(cfg.schemaProviderConfigProps));
log.info("Creating schema provider " + cfg.schemaProviderClassName + " with configs : " + schemaCfg.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, schemaCfg);
}
private void initKeyGenerator() throws IOException {
PropertiesConfiguration keygenCfg = UtilHelpers.readConfig(fs, new Path(cfg.keyGeneratorProps));
log.info("Creating key generator " + cfg.keyGeneratorClass + " with configs : " + keygenCfg.toString());
this.keyGenerator = UtilHelpers.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg);
}
private JavaSparkContext getSparkContext() {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-delta-streamer-" + cfg.targetTableName);
//sparkConf.setMaster(cfg.sparkMaster);
sparkConf.setMaster("local[2]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.driver.maxResultSize", "2g");
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
// register the schemas, so that shuffle does not serialize the full schemas
List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema());
sparkConf.registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
return new JavaSparkContext(sparkConf);
}
private void sync() throws Exception {
// Retrieve the previous round checkpoints, if any
Optional<String> resumeCheckpointStr = Optional.empty();
if (commitTimelineOpt.isPresent()) {
Optional<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get());
if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table " +
"was indeed built via delta streamer ");
}
}
} else {
Properties properties = new Properties();
properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName);
HoodieTableMetaClient.initializePathAsHoodieDataset(FSUtils.getFs(), cfg.targetBasePath, properties);
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
// Pull the data from the source & prepare the write
Pair<Optional<JavaRDD<GenericRecord>>, String> dataAndCheckpoint = source.fetchNewData(resumeCheckpointStr, cfg.maxInputBytes);
if (!dataAndCheckpoint.getKey().isPresent()) {
log.info("No new data, nothing to commit.. ");
return;
}
JavaRDD<GenericRecord> avroRDD = dataAndCheckpoint.getKey().get();
JavaRDD<HoodieRecord> records = avroRDD
.map(gr -> {
HoodieRecordPayload payload = UtilHelpers.createPayload(
cfg.payloadClassName,
gr,
(Comparable) gr.get(cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
// Perform the write
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(cfg.hoodieClientProps);
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg);
String commitTime = client.startCommit();
log.info("Starting commit : " + commitTime);
JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) {
writeStatusRDD = client.insert(records, commitTime);
} else if (cfg.operation == Operation.UPSERT) {
writeStatusRDD = client.upsert(records, commitTime);
} else {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
// Simply commit for now. TODO(vc): Support better error handlers later on
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, dataAndCheckpoint.getValue());
boolean success = client.commit(commitTime, writeStatusRDD, Optional.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// TODO(vc): Kick off hive sync from here.
} else {
log.info("Commit " + commitTime + " failed!");
}
client.close();
}
private HoodieWriteConfig getHoodieClientConfig(String hoodieClientCfgPath) throws Exception {
// TODO(vc): Double check all the options can be passed in like this. CompactionConfig, IndexConfig everything.
return HoodieWriteConfig.newBuilder()
.combineInput(true, true)
.withPath(cfg.targetBasePath)
.withAutoCommit(false)
.withSchema(schemaProvider.getTargetSchema().toString())
.forTable(cfg.targetTableName)
.withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.fromInputStream(fs.open(new Path(hoodieClientCfgPath)))
.build();
}
private enum Operation {
UPSERT,
INSERT
}
private class OperationConvertor implements IStringConverter<Operation> {
@Override
public Operation convert(String value) throws ParameterException {
return Operation.valueOf(value);
}
}
private class SourceFormatConvertor implements IStringConverter<SourceDataFormat> {
@Override
public SourceDataFormat convert(String value) throws ParameterException {
return SourceDataFormat.valueOf(value);
}
}
public static class Config implements Serializable {
/** TARGET CONFIGS **/
@Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset", required = true)
public String targetBasePath;
// TODO: How to obtain hive configs to register?
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName;
@Parameter(names = {"--hoodie-client-config"}, description = "path to properties file on localfs or dfs, with hoodie client config. Sane defaults" +
"are used, but recommend use to provide basic things like metrics endpoints, hive configs etc")
public String hoodieClientProps = null;
/** SOURCE CONFIGS **/
@Parameter(names = {"--source-class"}, description = "subclass of com.uber.hoodie.utilities.sources.Source to use to read data. " +
"built-in options: com.uber.hoodie.utilities.common.{DFSSource (default), KafkaSource, HiveIncrPullSource}")
public String sourceClassName = DFSSource.class.getName();
@Parameter(names = {"--source-config"}, description = "path to properties file on localfs or dfs, with source configs. " +
"For list of acceptable properties, refer the source class", required = true)
public String sourceConfigProps = null;
@Parameter(names = {"--source-format"}, description = "Format of data in source, JSON (default), Avro. All source data is " +
"converted to Avro using the provided schema in any case", converter = SourceFormatConvertor.class)
public SourceDataFormat sourceFormat = SourceDataFormat.JSON;
@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between " +
" records with same key in input data. Default: 'ts' holding unix timestamp of record")
public String sourceOrderingField = "ts";
@Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.utilities.common.KeyExtractor to generate" +
"a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (Uses provided field names as recordkey & partitionpath. " +
"Nested fields specified via dot notation, e.g: a.b.c)")
public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
@Parameter(names = {"--key-generator-config"}, description = "Path to properties file on localfs or dfs, with KeyGenerator configs. " +
"For list of acceptable properites, refer the KeyGenerator class", required = true)
public String keyGeneratorProps = null;
@Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. " +
"Default: SourceWrapperPayload. Implement your own, if you want to do something other than overwriting existing value")
public String payloadClassName = DeltaStreamerAvroPayload.class.getName();
@Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema.SchemaProvider " +
"to attach schemas to input & target table data, built in options: FilebasedSchemaProvider")
public String schemaProviderClassName = FilebasedSchemaProvider.class.getName();
@Parameter(names = {"--schemaprovider-config"}, description = "path to properties file on localfs or dfs, with schema configs. " +
"For list of acceptable properties, refer the schema provider class", required = true)
public String schemaProviderConfigProps = null;
/** Other configs **/
@Parameter(names = {"--max-input-bytes"}, description = "Maximum number of bytes to read from source. Default: 1TB")
public long maxInputBytes = 1L * 1024 * 1024 * 1024 * 1024;
@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " +
"is purely new data/inserts to gain speed)", converter = OperationConvertor.class)
public Operation operation = Operation.UPSERT;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
// TODO(vc): Do proper validation
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
new HoodieDeltaStreamer(cfg).sync();
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.exception;
import com.uber.hoodie.exception.HoodieException;
public class HoodieDeltaStreamerException extends HoodieException {
public HoodieDeltaStreamerException(String msg, Throwable e) {
super(msg, e);
}
public HoodieDeltaStreamerException(String msg) {
super(msg);
}
}

View File

@@ -1,17 +1,19 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.exception;

View File

@@ -1,17 +1,19 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.exception;

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.keygen;
import com.uber.hoodie.common.model.HoodieKey;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.Serializable;
/**
* Abstract class to extend for plugging in extraction of {@link com.uber.hoodie.common.model.HoodieKey}
* from an Avro record
*/
public abstract class KeyGenerator implements Serializable {
protected transient PropertiesConfiguration config;
protected KeyGenerator(PropertiesConfiguration config) {
this.config = config;
}
/**
* Generate a Hoodie Key out of provided generic record.
*
* @param record
* @return
*/
public abstract HoodieKey getKey(GenericRecord record);
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.keygen;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.utilities.UtilHelpers;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.util.Arrays;
/**
* Simple key generator, which takes names of fields to be used for recordKey and partitionPath
* as configs.
*/
public class SimpleKeyGenerator extends KeyGenerator {
protected final String recordKeyField;
protected final String partitionPathField;
/**
* Supported configs
*/
static class Config {
private static final String RECORD_KEY_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.recordkey.field";
private static final String PARTITION_PATH_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.partitionpath.field";
}
public SimpleKeyGenerator(PropertiesConfiguration config) {
super(config);
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.PARTITION_PATH_FIELD_PROP, Config.RECORD_KEY_FIELD_PROP));
this.recordKeyField = config.getString(Config.RECORD_KEY_FIELD_PROP);
this.partitionPathField = config.getString(Config.PARTITION_PATH_FIELD_PROP);
}
@Override
public HoodieKey getKey(GenericRecord record) {
return new HoodieKey(record.get(recordKeyField).toString(), record.get(partitionPathField).toString());
}
}

View File

@@ -0,0 +1,105 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.keygen;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
/**
* Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
*
*/
public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
enum TimestampType implements Serializable {
UNIX_TIMESTAMP,
DATE_STRING,
MIXED
}
private final TimestampType timestampType;
private SimpleDateFormat inputDateFormat;
private final String outputDateFormat;
/**
* Supported configs
*/
static class Config {
// One value from TimestampType above
private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type";
private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat";
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.dateformat";
}
public TimestampBasedKeyGenerator(PropertiesConfiguration config) {
super(config);
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
this.outputDateFormat = config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
}
}
@Override
public HoodieKey getKey(GenericRecord record) {
Object partitionVal = record.get(partitionPathField);
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
try {
long unixTime;
if (partitionVal instanceof Double) {
unixTime = ((Double) partitionVal).longValue();
} else if (partitionVal instanceof Float) {
unixTime = ((Float) partitionVal).longValue();
} else if (partitionVal instanceof Long) {
unixTime = (Long) partitionVal;
} else if (partitionVal instanceof String) {
unixTime = inputDateFormat.parse(partitionVal.toString()).getTime();
} else {
throw new HoodieNotSupportedException("Unexpected type for partition field: "+ partitionVal.getClass().getName());
}
return new HoodieKey(record.get(recordKeyField).toString(),
partitionPathFormat.format(new Date(unixTime * 1000)));
} catch (ParseException pe) {
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
}
}
}

View File

@@ -0,0 +1,74 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.schema;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.UtilHelpers;
import org.apache.avro.Schema;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Arrays;
/**
* A simple schema provider, that reads off files on DFS
*/
public class FilebasedSchemaProvider extends SchemaProvider {
/**
* Configs supported
*/
static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased.schemaprovider.source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased.schemaprovider.target.schema.file";
}
private final FileSystem fs;
private final Schema sourceSchema;
private final Schema targetSchema;
public FilebasedSchemaProvider(PropertiesConfiguration config) {
super(config);
this.fs = FSUtils.getFs();
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
this.targetSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.TARGET_SCHEMA_FILE_PROP))));
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
}
@Override
public Schema getSourceSchema() {
return sourceSchema;
}
@Override
public Schema getTargetSchema() {
return targetSchema;
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.schema;
import org.apache.avro.Schema;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.Serializable;
/**
* Class to provide schema for reading data and also writing into a Hoodie table
*/
public abstract class SchemaProvider implements Serializable {
protected PropertiesConfiguration config;
protected SchemaProvider(PropertiesConfiguration config) {
this.config = config;
}
public abstract Schema getSourceSchema();
public abstract Schema getTargetSchema();
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import com.uber.hoodie.avro.MercifulJsonConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.io.Serializable;
/**
* Convert a variety of {@link SourceDataFormat} into Avro GenericRecords. Has a bunch of lazy fields
* to circumvent issues around serializing these objects from driver to executors
*/
public class AvroConvertor implements Serializable {
/**
* To be lazily inited on executors
*/
private transient Schema schema;
private final String schemaStr;
/**
* To be lazily inited on executors
*/
private transient MercifulJsonConverter jsonConverter;
/**
* To be lazily inited on executors
*/
private transient Injection<GenericRecord, byte[]> recordInjection;
public AvroConvertor(String schemaStr) {
this.schemaStr = schemaStr;
}
private void initSchema() {
if (schema == null) {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaStr);
}
}
private void initInjection() {
if (recordInjection == null) {
recordInjection = GenericAvroCodecs.toBinary(schema);
}
}
private void initJsonConvertor() {
if (jsonConverter == null) {
jsonConverter = new MercifulJsonConverter(schema);
}
}
public GenericRecord fromJson(String json) throws IOException {
initSchema();
initJsonConvertor();
return jsonConverter.convert(json);
}
public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException {
initSchema();
initInjection();
return recordInjection.invert(avroBinary).get();
}
}

View File

@@ -0,0 +1,152 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Source to read data from a given DFS directory structure, incrementally
*/
public class DFSSource extends Source {
/**
* Configs supported
*/
static class Config {
private final static String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
}
private final static List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_");
private final transient FileSystem fs;
public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs();
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
}
public static JavaRDD<GenericRecord> fromAvroFiles(final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) {
JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr,
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class,
sparkContext.hadoopConfiguration());
return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
}
public static JavaRDD<GenericRecord> fromJsonFiles(final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) {
return sparkContext.textFile(pathStr).map((String j) -> {
return convertor.fromJson(j);
});
}
public static JavaRDD<GenericRecord> fromFiles(SourceDataFormat dataFormat, final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) {
if (dataFormat == SourceDataFormat.AVRO) {
return DFSSource.fromAvroFiles(convertor, pathStr, sparkContext);
} else if (dataFormat == SourceDataFormat.JSON) {
return DFSSource.fromJsonFiles(convertor, pathStr, sparkContext);
} else {
throw new HoodieNotSupportedException("Unsupported data format :" + dataFormat);
}
}
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> lastCheckpointStr, long maxInputBytes) {
try {
// obtain all eligible files under root folder.
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr = fs.listFiles(new Path(config.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory() ||
IGNORE_FILEPREFIX_LIST.stream().filter(pfx -> fileStatus.getPath().getName().startsWith(pfx)).count() > 0) {
continue;
}
eligibleFiles.add(fileStatus);
}
// sort them by modification time.
eligibleFiles.sort((FileStatus f1, FileStatus f2) -> Long.valueOf(f1.getModificationTime()).compareTo(Long.valueOf(f2.getModificationTime())));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get())) {
// skip processed files
continue;
}
maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
if (currentBytes >= maxInputBytes) {
// we have enough data, we are done
break;
}
}
// no data to read
if (filteredFiles.size() == 0) {
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : String.valueOf(Long.MIN_VALUE));
}
// read the files out.
String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
String schemaStr = schemaProvider.getSourceSchema().toString();
final AvroConvertor avroConvertor = new AvroConvertor(schemaStr);
return new ImmutablePair<>(Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)),
String.valueOf(maxModificationTime));
} catch (IOException ioe) {
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}

View File

@@ -0,0 +1,132 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Source to read deltas produced by {@link com.uber.hoodie.utilities.HiveIncrementalPuller}, commit
* by commit and apply to the target table
*
* The general idea here is to have commits sync across the data pipeline.
*
* [Source Tables(s)] ====> HiveIncrementalScanner ==> incrPullRootPath ==> targetTable
* {c1,c2,c3,...} {c1,c2,c3,...} {c1,c2,c3,...}
*
* This produces beautiful causality, that makes data issues in ETLs very easy to debug
*
*/
public class HiveIncrPullSource extends Source {
private static volatile Logger log = LogManager.getLogger(HiveIncrPullSource.class);
private final transient FileSystem fs;
private final String incrPullRootPath;
/**
* Configs supported
*/
static class Config {
private final static String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.incrpull.root";
}
public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs();
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP);
}
/**
* Finds the first commit from source, greater than the target's last commit, and reads it out.
*/
private Optional<String> findCommitToPull(Optional<String> latestTargetCommit) throws IOException {
log.info("Looking for commits ");
FileStatus[] commitTimePaths = fs.listStatus(new Path(incrPullRootPath));
List<String> commitTimes = new ArrayList<>(commitTimePaths.length);
for (FileStatus commitTimePath : commitTimePaths) {
String[] splits = commitTimePath.getPath().toString().split("/");
commitTimes.add(splits[splits.length - 1]);
}
Collections.sort(commitTimes);
log.info("Retrieved commit times " + commitTimes);
if (!latestTargetCommit.isPresent()) {
// start from the beginning
return Optional.of(commitTimes.get(0));
}
for (String commitTime : commitTimes) {
//TODO(vc): Add an option to delete consumed commits
if (commitTime.compareTo(latestTargetCommit.get()) > 0) {
return Optional.of(commitTime);
}
}
return Optional.empty();
}
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> lastCheckpointStr, long maxInputBytes) {
try {
// find the source commit to pull
Optional<String> commitToPull = findCommitToPull(lastCheckpointStr);
if (!commitToPull.isPresent()) {
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
// read the files out.
List<FileStatus> commitDeltaFiles = Arrays.asList(fs.listStatus(new Path(incrPullRootPath, commitToPull.get())));
String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(","));
String schemaStr = schemaProvider.getSourceSchema().toString();
final AvroConvertor avroConvertor = new AvroConvertor(schemaStr);
return new ImmutablePair<>(Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)),
String.valueOf(commitToPull.get()));
} catch (IOException ioe) {
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}
}

View File

@@ -0,0 +1,247 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import kafka.common.TopicAndPartition;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import kafka.serializer.DefaultDecoder;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.util.Either;
/**
* Source to read data from Kafka, incrementally
*/
public class KafkaSource extends Source {
private static volatile Logger log = LogManager.getLogger(KafkaSource.class);
static class CheckpointUtils {
/**
* Reconstruct checkpoint from string.
*
* @param checkpointStr
* @return
*/
public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String checkpointStr) {
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap = new HashMap<>();
String[] splits = checkpointStr.split(",");
String topic = splits[0];
for (int i = 1; i < splits.length; i++) {
String[] subSplits = splits[i].split(":");
offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])),
new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1])));
}
return offsetMap;
}
/**
* String representation of checkpoint
*
* Format:
* topic1,0:offset0,1:offset1,2:offset2, .....
*
* @param offsetMap
* @return
*/
public static String offsetsToStr(HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> offsetMap) {
StringBuilder sb = new StringBuilder();
// atleast 1 partition will be present.
sb.append(offsetMap.entrySet().stream().findFirst().get().getKey().topic() + ",");
sb.append(offsetMap.entrySet().stream()
.map(e -> String.format("%s:%d",e.getKey().partition(), e.getValue().offset()))
.collect(Collectors.joining(",")));
return sb.toString();
}
public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsetMap,
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsetMap) {
Comparator<OffsetRange> byPartition = (OffsetRange o1, OffsetRange o2) -> {
return Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition()));
};
List<OffsetRange> offsetRanges = toOffsetMap.entrySet().stream().map(e -> {
TopicAndPartition tp = e.getKey();
long fromOffset = -1;
if (fromOffsetMap.containsKey(tp)){
fromOffset = fromOffsetMap.get(tp).offset();
}
return OffsetRange.create(tp, fromOffset, e.getValue().offset());
}).sorted(byPartition).collect(Collectors.toList());
OffsetRange[] ranges = new OffsetRange[offsetRanges.size()];
return offsetRanges.toArray(ranges);
}
public static long totalNewMessages(OffsetRange[] ranges) {
long totalMsgs = 0;
for (OffsetRange range: ranges) {
totalMsgs += Math.max(range.untilOffset()-range.fromOffset(), 0);
}
return totalMsgs;
}
}
/**
* Helpers to deal with tricky scala <=> java conversions. (oh my!)
*/
static class ScalaHelpers {
public static <K,V> Map<K, V> toScalaMap(HashMap<K, V> m) {
return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(
Predef.<Tuple2<K, V>>conforms()
);
}
public static Set<String> toScalaSet(HashSet<String> s) {
return JavaConverters.asScalaSetConverter(s).asScala().<String>toSet();
}
public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
return JavaConverters.<K, V>mapAsJavaMapConverter(m).asJava();
}
}
/**
* Configs to be passed for this source. All standard Kafka consumer configs are also
* respected
*/
static class Config {
private final static String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private final static String DEFAULT_AUTO_RESET_OFFSET = "largest";
}
private HashMap<String, String> kafkaParams;
private final String topicName;
public KafkaSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
kafkaParams = new HashMap<>();
Stream<String> keys = StreamSupport.stream(Spliterators.spliteratorUnknownSize(config.getKeys(), Spliterator.NONNULL), false);
keys.forEach(k -> kafkaParams.put(k, config.getString(k)));
UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME));
topicName = config.getString(Config.KAFKA_TOPIC_NAME);
}
@Override
public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> lastCheckpointStr, long maxInputBytes) {
// Obtain current metadata for the topic
KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams));
Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Arrays.asList(topicName))));
if (either.isLeft()) {
// log errors. and bail out.
throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head());
}
Set<TopicAndPartition> topicPartitions = either.right().get();
// Determine the offset ranges to read from
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets;
if (lastCheckpointStr.isPresent()) {
fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
} else {
String autoResetValue = config.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET);
if (autoResetValue.equals("smallest")) {
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get()));
} else if (autoResetValue.equals("largest")) {
fromOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
} else {
throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
}
}
// Always read until the latest offset
HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> toOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
// Come up with final set of OffsetRanges to read (account for new partitions)
// TODO(vc): Respect maxInputBytes, by estimating number of messages to read each batch from partition size
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
if (totalNewMsgs <= 0) {
return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : CheckpointUtils.offsetsToStr(toOffsets));
} else {
log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName);
}
// Perform the actual read from Kafka
JavaRDD<byte[]> kafkaRDD = KafkaUtils.createRDD(
sparkContext,
byte[].class,
byte[].class,
DefaultDecoder.class,
DefaultDecoder.class,
kafkaParams,
offsetRanges).values();
// Produce a RDD[GenericRecord]
final AvroConvertor avroConvertor = new AvroConvertor(schemaProvider.getSourceSchema().toString());
JavaRDD<GenericRecord> newDataRDD;
if (dataFormat == SourceDataFormat.AVRO) {
newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromAvroBinary(bytes));
} else if (dataFormat == SourceDataFormat.JSON) {
newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromJson(new String(bytes, Charset.forName("utf-8"))));
} else {
throw new HoodieNotSupportedException("Unsupport data format :" + dataFormat);
}
return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(toOffsets));
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import java.util.Optional;
/**
* Represents a source from which we can tail data. Assumes a constructor that takes properties.
*/
public abstract class Source implements Serializable {
protected transient PropertiesConfiguration config;
protected transient JavaSparkContext sparkContext;
protected transient SourceDataFormat dataFormat;
protected transient SchemaProvider schemaProvider;
protected Source(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
this.config = config;
this.sparkContext = sparkContext;
this.dataFormat = dataFormat;
this.schemaProvider = schemaProvider;
}
/**
* Fetches new data upto maxInputBytes, from the provided checkpoint and returns an RDD of the data,
* as well as the checkpoint to be written as a result of that.
*
* @param lastCheckpointStr
* @param maxInputBytes
* @return
*/
public abstract Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> lastCheckpointStr,
long maxInputBytes);
public PropertiesConfiguration getConfig() {
return config;
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.utilities.sources;
/**
* Format of the data within source.
*/
public enum SourceDataFormat {
AVRO, // No conversion needed explicitly to avro
JSON, // we will try to convert to avro
ROW, // Will be added later, so we can plug/play with spark sources.
CUSTOM // the source is responsible for conversion to avro.
}

View File

@@ -0,0 +1,19 @@
#
# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
hoodie.upsert.shuffle.parallelism=2

View File

@@ -0,0 +1,20 @@
#
# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
hoodie.deltastreamer.keygen.simple.recordkey.field=_row_key
hoodie.deltastreamer.keygen.simple.partitionpath.field=driver

View File

@@ -0,0 +1,20 @@
#
# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc
hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc

View File

@@ -0,0 +1,30 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [ {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
} ]
}

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
# DFS Source
hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=uber_trips
metadata.broker.list=localhost:9092
auto.offset.reset=smallest

View File

@@ -0,0 +1,29 @@
{
"type" : "record",
"name" : "triprec",
"fields" : [ {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "fare",
"type" : "double"
} ]
}