diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index cd916e668..b87e84c7d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -64,6 +64,8 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.Collections; import java.util.Date; + +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -317,6 +319,16 @@ public class HoodieWriteClient implements Seriali * Commit changes performed at the given commitTime marker */ public boolean commit(String commitTime, JavaRDD writeStatuses) { + return commit(commitTime, writeStatuses, Optional.empty()); + } + + /** + * Commit changes performed at the given commitTime marker + */ + public boolean commit(String commitTime, + JavaRDD writeStatuses, + Optional> extraMetadata) { + logger.info("Comitting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable @@ -333,6 +345,10 @@ public class HoodieWriteClient implements Seriali for (Tuple2 stat : stats) { metadata.addWriteStat(stat._1(), stat._2()); } + // add in extra metadata + if (extraMetadata.isPresent()) { + extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v)); + } try { String actionType = table.getCommitActionType(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index d996273de..b95094676 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -27,6 +27,7 @@ import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; import java.util.Properties; /** @@ -230,6 +231,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } } + public Builder fromInputStream(InputStream inputStream) throws IOException { + try { + this.props.load(inputStream); + return this; + } finally { + inputStream.close(); + } + } + public Builder withPath(String basePath) { props.setProperty(BASE_PATH_PROP, basePath); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/avro/MercifulJsonConverter.java b/hoodie-common/src/main/java/com/uber/hoodie/avro/MercifulJsonConverter.java index 90e5257a0..9efe8408d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/avro/MercifulJsonConverter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/avro/MercifulJsonConverter.java @@ -17,6 +17,7 @@ package com.uber.hoodie.avro; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java index 164766dc7..9fc0b3570 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -29,6 +29,7 @@ import java.io.IOException; /** * This is a payload to wrap a existing Hoodie Avro Record. * Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions) + * */ public class HoodieAvroPayload implements HoodieRecordPayload { private final Optional record; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index ff221fc0b..b58df02de 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -16,6 +16,8 @@ package com.uber.hoodie.common.model; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.codehaus.jackson.annotate.JsonAutoDetect; @@ -33,21 +35,29 @@ import java.util.Map; /** * All the metadata that gets stored along with a commit. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class HoodieCommitMetadata implements Serializable { private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); private HashMap> partitionToWriteStats; + private HashMap extraMetadataMap; + public HoodieCommitMetadata() { + extraMetadataMap = new HashMap<>(); partitionToWriteStats = new HashMap<>(); } public void addWriteStat(String partitionPath, HoodieWriteStat stat) { if (!partitionToWriteStats.containsKey(partitionPath)) { - partitionToWriteStats.put(partitionPath, new ArrayList()); + partitionToWriteStats.put(partitionPath, new ArrayList<>()); } partitionToWriteStats.get(partitionPath).add(stat); } + public void addMetadata(String metaKey, String value) { + extraMetadataMap.put(metaKey, value); + } + public List getWriteStats(String partitionPath) { return partitionToWriteStats.get(partitionPath); } @@ -56,6 +66,10 @@ public class HoodieCommitMetadata implements Serializable { return partitionToWriteStats; } + public String getMetadata(String metaKey) { + return extraMetadataMap.get(metaKey); + } + public HashMap getFileIdAndFullPaths() { HashMap filePaths = new HashMap<>(); // list all partitions paths diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index 13091006c..a708ebc56 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -25,12 +25,21 @@ hoodie-utilities jar + org.jacoco jacoco-maven-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + org.apache.maven.plugins maven-assembly-plugin @@ -41,7 +50,7 @@ - com.uber.hoodie.utilities.HoodieDeltaStreamer + com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer @@ -57,7 +66,6 @@ - @@ -220,6 +228,19 @@ + + org.apache.spark + spark-streaming_2.10 + ${spark.version} + provided + + + + org.apache.spark + spark-streaming-kafka-0-8_2.10 + ${spark.version} + + org.antlr @@ -238,7 +259,27 @@ 1.10.19 test + + org.apache.avro + avro-mapred + 1.7.6-cdh5.7.2 + + + org.apache.parquet + parquet-avro + + + + org.apache.parquet + parquet-hadoop + + + + com.twitter + bijection-avro_2.10 + 0.9.2 + diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java index e3a8c07fe..0311bc6e1 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java @@ -1,17 +1,19 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package com.uber.hoodie.utilities; @@ -78,7 +80,7 @@ public class HiveIncrementalPuller { @Parameter(names = {"--fromCommitTime"}) public String fromCommitTime; @Parameter(names = {"--maxCommits"}) public int maxCommits = 3; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; - @Parameter(names = {"--storageFormat"}) public String tempTableStorageFormat = "PARQUET"; + @Parameter(names = {"--storageFormat"}) public String tempTableStorageFormat = "AVRO"; } static { @@ -169,11 +171,11 @@ public class HiveIncrementalPuller { throw new HoodieIncrementalPullSQLException( "Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable); } - if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) { + if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) { log.info("Incremental SQL : " + incrementalSQL - + " does not contain `_hoodie_commit_time` > '%s'. Please add this clause for incremental to work properly."); + + " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add this clause for incremental to work properly."); throw new HoodieIncrementalPullSQLException( - "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally"); + "Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which means its not pulling incrementally"); } incrementalPullSQLtemplate diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieDeltaStreamer.java deleted file mode 100644 index ea34b42d7..000000000 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieDeltaStreamer.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.utilities; - -import com.google.common.io.Files; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.uber.hoodie.HoodieWriteClient; -import com.uber.hoodie.common.HoodieJsonPayload; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.config.HoodieIndexConfig; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.index.HoodieIndex; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset. - * Does not maintain any state, queries at runtime to see how far behind the target dataset is from - * the source dataset. This can be overriden to force sync from a timestamp. - */ -public class HoodieDeltaStreamer implements Serializable { - private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); - private final Config cfg; - - public HoodieDeltaStreamer(Config cfg) throws IOException { - this.cfg = cfg; - } - - private void sync() throws Exception { - JavaSparkContext sc = getSparkContext(cfg); - FileSystem fs = FSUtils.getFs(); - HoodieTableMetaClient targetHoodieMetadata = new HoodieTableMetaClient(fs, cfg.targetPath); - HoodieTimeline timeline = targetHoodieMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - String lastCommitPulled = findLastCommitPulled(fs, cfg.dataPath); - log.info("Last commit pulled on the source dataset is " + lastCommitPulled); - if (!timeline.getInstants().iterator().hasNext() && timeline - .compareTimestamps(timeline.lastInstant().get().getTimestamp(), lastCommitPulled, - HoodieTimeline.GREATER)) { - // this should never be the case - throw new IllegalStateException( - "Last commit pulled from source table " + lastCommitPulled - + " is before the last commit in the target table " + timeline.lastInstant() - .get()); - } - if (!cfg.override && timeline.containsOrBeforeTimelineStarts(lastCommitPulled)) { - throw new IllegalStateException( - "Target Table already has the commit " + lastCommitPulled - + ". Not overriding as cfg.override is false"); - } - syncTill(lastCommitPulled, targetHoodieMetadata, sc); - } - - private String findLastCommitPulled(FileSystem fs, String dataPath) throws IOException { - FileStatus[] commitTimePaths = fs.listStatus(new Path(dataPath)); - List commitTimes = new ArrayList<>(commitTimePaths.length); - for (FileStatus commitTimePath : commitTimePaths) { - String[] splits = commitTimePath.getPath().toString().split("/"); - commitTimes.add(splits[splits.length - 1]); - } - Collections.sort(commitTimes); - Collections.reverse(commitTimes); - log.info("Retrieved commit times " + commitTimes); - return commitTimes.get(0); - } - - private void syncTill(String lastCommitPulled, HoodieTableMetaClient target, - JavaSparkContext sc) throws Exception { - // Step 1 : Scan incrementally and get the input records as a RDD of source format - String dataPath = cfg.dataPath + "/" + lastCommitPulled; - log.info("Using data path " + dataPath); - JavaRDD rdd = sc.textFile(dataPath); - - // Step 2 : Create the hoodie records - JavaRDD> records = - rdd.map(new Function>() { - @Override - public HoodieRecord call(String json) - throws Exception { - HoodieJsonPayload payload = new HoodieJsonPayload(json); - HoodieKey key = new HoodieKey(payload.getRowKey(cfg.keyColumnField), - payload.getPartitionPath(cfg.partitionPathField)); - return new HoodieRecord<>(key, payload); - } - }); - - // Step 3: Use Hoodie Client to upsert/bulk load the records into target hoodie dataset - HoodieWriteConfig hoodieCfg = getHoodieClientConfig(target); - HoodieWriteClient client = new HoodieWriteClient<>(sc, hoodieCfg); - log.info("Rollback started " + lastCommitPulled); - client.rollback(lastCommitPulled); - - client.startCommitWithTime(lastCommitPulled); - log.info("Starting commit " + lastCommitPulled); - if (cfg.upsert) { - log.info("Upserting records"); - client.upsert(records, lastCommitPulled); - } else { - log.info("Inserting records"); - // insert the records. - client.insert(records, lastCommitPulled); - } - - // TODO - revisit this - can we clean this up. - // determine if this write should be committed. -// final Accumulator errorCount = sc.intAccumulator(0); -// final Accumulator totalCount = sc.intAccumulator(0); -// statuses.foreach(new VoidFunction() { -// @Override public void call(WriteStatus status) throws Exception { -// if (status.hasGlobalError()) { -// log.error(status.getGlobalError()); -// errorCount.add(1); -// } -// if (status.hasErrors()) { -// log.info(status); -// for (Map.Entry keyErrEntry : status.getErrors() -// .entrySet()) { -// log.error(String.format("\t %s error %s", keyErrEntry.getKey(), -// keyErrEntry.getValue().getMessage()), keyErrEntry.getValue()); -// } -// } -// errorCount.add(status.getErrors().size()); -// totalCount.add(status.getWrittenRecords().size()); -// } -// }) - } - - private HoodieWriteConfig getHoodieClientConfig(HoodieTableMetaClient metadata) - throws Exception { - final String schemaStr = Files.toString(new File(cfg.schemaFile), Charset.forName("UTF-8")); - return HoodieWriteConfig.newBuilder().withPath(metadata.getBasePath()) - .withSchema(schemaStr) - .withParallelism(cfg.groupByParallelism, cfg.groupByParallelism) - .forTable(metadata.getTableConfig().getTableName()).withIndexConfig( - HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .build(); - } - - private JavaSparkContext getSparkContext(Config cfg) { - SparkConf sparkConf = new SparkConf().setAppName("hoodie-delta-streamer-" + cfg.targetTableName); - sparkConf.setMaster(cfg.sparkMaster); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.driver.maxResultSize", "2g"); - - if (cfg.sparkMaster.startsWith("yarn")) { - sparkConf.set("spark.eventLog.overwrite", "true"); - sparkConf.set("spark.eventLog.enabled", "true"); - } - - // Configure hadoop conf - sparkConf.set("spark.hadoop.mapred.output.compress", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", - "org.apache.hadoop.io.compress.GzipCodec"); - sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - - sparkConf = HoodieWriteClient.registerClasses(sparkConf); - return new JavaSparkContext(sparkConf); - } - - public static class Config implements Serializable { - @Parameter(names = {"--dataPath"}) - public String dataPath; - @Parameter(names = {"--parallelism"}) - public int groupByParallelism = 10000; - @Parameter(names = {"--upsert"}) - public boolean upsert = false; - @Parameter(names = {"--master"}) - public String sparkMaster = "yarn-client"; - @Parameter(names = {"--targetPath"}, required = true) - public String targetPath; - @Parameter(names = {"--targetTable"}) - public String targetTableName; - @Parameter(names = {"--keyColumn"}) - public String keyColumnField = "uuid"; - @Parameter(names = {"--partitionPathField"}) - public String partitionPathField = "request_at"; - @Parameter(names = {"--schemaFile"}) - public String schemaFile; - @Parameter(names = {"--override"}) - public boolean override = false; - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - } - - public static void main(String[] args) throws Exception { - final Config cfg = new Config(); - JCommander cmd = new JCommander(cfg, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - new HoodieDeltaStreamer(cfg).sync(); - } -} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 7cfb8392e..7234491e1 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -1,17 +1,19 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package com.uber.hoodie.utilities; @@ -74,7 +76,7 @@ public class HoodieSnapshotCopier implements Serializable { if(!latestCommit.isPresent()) { logger.warn("No commits present. Nothing to snapshot"); } else { - logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommit.get())); + logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %targetBasePath.", latestCommit.get())); } List partitions = FSUtils.getAllPartitionPaths(fs, baseDir); @@ -84,7 +86,7 @@ public class HoodieSnapshotCopier implements Serializable { // Make sure the output directory is empty Path outputPath = new Path(outputDir); if (fs.exists(outputPath)) { - logger.warn(String.format("The output path %s already exists, deleting", outputPath)); + logger.warn(String.format("The output path %targetBasePath already exists, deleting", outputPath)); fs.delete(new Path(outputDir), true); } @@ -118,7 +120,7 @@ public class HoodieSnapshotCopier implements Serializable { }); // Also copy the .commit files - logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommit.get())); + logger.info(String.format("Copying .commit files which are no-late-than %targetBasePath.", latestCommit.get())); FileStatus[] commitFilesToCopy = fs.listStatus( new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter() { @Override @@ -141,7 +143,7 @@ public class HoodieSnapshotCopier implements Serializable { fs.mkdirs(targetFilePath.getParent()); } if (fs.exists(targetFilePath)) { - logger.error(String.format("The target output commit file (%s) already exists.", targetFilePath)); + logger.error(String.format("The target output commit file (%targetBasePath) already exists.", targetFilePath)); } FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf()); } @@ -152,7 +154,7 @@ public class HoodieSnapshotCopier implements Serializable { // Create the _SUCCESS tag Path successTagPath = new Path(outputDir + "/_SUCCESS"); if (!fs.exists(successTagPath)) { - logger.info(String.format("Creating _SUCCESS under %s.", outputDir)); + logger.info("Creating _SUCCESS under targetBasePath: " + outputDir); fs.createNewFile(successTagPath); } } @@ -161,7 +163,7 @@ public class HoodieSnapshotCopier implements Serializable { // Take input configs final Config cfg = new Config(); new JCommander(cfg, args); - logger.info(String.format("Snapshot hoodie table from %s to %s", cfg.basePath, cfg.outputPath)); + logger.info(String.format("Snapshot hoodie table from %targetBasePath to %targetBasePath", cfg.basePath, cfg.outputPath)); // Create a spark job to do the snapshot copy SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier"); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java new file mode 100644 index 000000000..8769af160 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities; + +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import com.uber.hoodie.utilities.keygen.KeyGenerator; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.Source; +import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; +import com.uber.hoodie.utilities.sources.SourceDataFormat; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.List; + +/** + * Bunch of helper methods + */ +public class UtilHelpers { + + public static Source createSource(String sourceClass, PropertiesConfiguration cfg, JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider) throws IOException { + try { + return (Source) ConstructorUtils.invokeConstructor(Class.forName(sourceClass), (Object) cfg, (Object) jssc, (Object) dataFormat, (Object) schemaProvider); + } catch (Throwable e) { + throw new IOException("Could not load source class " + sourceClass, e); + } + } + + public static SchemaProvider createSchemaProvider(String schemaProviderClass, PropertiesConfiguration cfg) throws IOException { + try { + return (SchemaProvider) ConstructorUtils.invokeConstructor(Class.forName(schemaProviderClass), (Object) cfg); + } catch (Throwable e) { + throw new IOException("Could not load schema provider class " + schemaProviderClass, e); + } + } + + public static KeyGenerator createKeyGenerator(String keyGeneratorClass, PropertiesConfiguration cfg) throws IOException { + try { + return (KeyGenerator) ConstructorUtils.invokeConstructor(Class.forName(keyGeneratorClass), (Object) cfg); + } catch (Throwable e) { + throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } + } + + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { + try { + return (HoodieRecordPayload) ConstructorUtils.invokeConstructor(Class.forName(payloadClass), (Object) record, (Object) orderingVal); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } + + /** + * + * TODO: Support hierarchical config files (see CONFIGURATION-609 for sample) + * + * @param fs + * @param cfgPath + * @return + */ + public static PropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) { + try { + FSDataInputStream in = fs.open(cfgPath); + PropertiesConfiguration config = new PropertiesConfiguration(); + config.load(in); + in.close(); + return config; + } catch (IOException e) { + throw new HoodieIOException("Unable to read config file at :" + cfgPath, e); + } catch (ConfigurationException e) { + throw new HoodieDeltaStreamerException("Invalid configs found in config file at :" + cfgPath, e); + } + } + + public static void checkRequiredProperties(PropertiesConfiguration configuration, List checkPropNames) { + checkPropNames.stream().forEach(prop -> { + if (!configuration.containsKey(prop)) { + throw new HoodieNotSupportedException("Required property "+ prop + " is missing"); + } + }); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java new file mode 100644 index 000000000..70c506cfb --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.HoodieAvroUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.Optional; + +/** + * Default payload used for delta streamer. + * + * 1. preCombine - Picks the latest delta record for a key, based on an ordering field + * 2. combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record + */ +public class DeltaStreamerAvroPayload extends DeltaStreamerPayload implements HoodieRecordPayload { + + /** + * + * @param record + * @param orderingVal + */ + public DeltaStreamerAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + @Override + public DeltaStreamerAvroPayload preCombine(DeltaStreamerAvroPayload another) { + if (another.orderingVal.compareTo(orderingVal) > 0) { + return another; + } else { + return this; + } + } + + @Override + public Optional combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + // combining strategy here trivially ignores currentValue on disk and writes this record + return getInsertValue(schema); + } + + @Override + public Optional getInsertValue(Schema schema) throws IOException { + return Optional.of(HoodieAvroUtils.rewriteRecord(record, schema)); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java new file mode 100644 index 000000000..fecfb1d40 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import org.apache.avro.generic.GenericRecord; +import java.io.Serializable; + +/** + * Base class for all payload supported for the {@link HoodieDeltaStreamer} + */ +public abstract class DeltaStreamerPayload implements Serializable { + + + /** + * Avro data extracted from the source + */ + protected final GenericRecord record; + + /** + * For purposes of preCombining + */ + protected final Comparable orderingVal; + + /** + * + * @param record + * @param orderingVal + */ + public DeltaStreamerPayload(GenericRecord record, Comparable orderingVal) { + this.record = record; + this.orderingVal = orderingVal; + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java new file mode 100644 index 000000000..a2f3aef75 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -0,0 +1,359 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.utilities.HiveIncrementalPuller; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.keygen.SimpleKeyGenerator; +import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; +import com.uber.hoodie.utilities.sources.DFSSource; +import com.uber.hoodie.utilities.keygen.KeyGenerator; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.Source; +import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; +import com.uber.hoodie.utilities.sources.SourceDataFormat; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import scala.collection.JavaConversions; + +/** + * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset. + * Does not maintain any state, queries at runtime to see how far behind the target dataset is from + * the source dataset. This can be overriden to force sync from a timestamp. + */ +public class HoodieDeltaStreamer implements Serializable { + + private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); + + private static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + + private final Config cfg; + + /** + * Source to pull deltas from + */ + private transient Source source; + + /** + * Schema provider that supplies the command for reading the input and writing out the + * target table. + */ + private transient SchemaProvider schemaProvider; + + /** + * Extract the key for the target dataset + */ + private KeyGenerator keyGenerator; + + /** + * Filesystem used + */ + private transient FileSystem fs; + + /** + * Timeline with completed commits + */ + private transient Optional commitTimelineOpt; + + /** + * Spark context + */ + private transient JavaSparkContext jssc; + + + public HoodieDeltaStreamer(Config cfg) throws IOException { + this.cfg = cfg; + this.fs = FSUtils.getFs(); + + + if (fs.exists(new Path(cfg.targetBasePath))) { + HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath); + this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + } else { + this.commitTimelineOpt = Optional.empty(); + } + + //TODO(vc) Should these be passed from outside? + initSchemaProvider(); + initKeyGenerator(); + this.jssc = getSparkContext(); + + initSource(); + } + + private void initSource() throws IOException { + // Create the source & schema providers + PropertiesConfiguration sourceCfg = UtilHelpers.readConfig(fs, new Path(cfg.sourceConfigProps)); + log.info("Creating source " + cfg.sourceClassName + " with configs : " + sourceCfg.toString()); + this.source = UtilHelpers.createSource(cfg.sourceClassName, sourceCfg, jssc, cfg.sourceFormat, schemaProvider); + } + + private void initSchemaProvider() throws IOException { + PropertiesConfiguration schemaCfg = UtilHelpers.readConfig(fs, new Path(cfg.schemaProviderConfigProps)); + log.info("Creating schema provider " + cfg.schemaProviderClassName + " with configs : " + schemaCfg.toString()); + this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, schemaCfg); + } + + private void initKeyGenerator() throws IOException { + PropertiesConfiguration keygenCfg = UtilHelpers.readConfig(fs, new Path(cfg.keyGeneratorProps)); + log.info("Creating key generator " + cfg.keyGeneratorClass + " with configs : " + keygenCfg.toString()); + this.keyGenerator = UtilHelpers.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg); + } + + + private JavaSparkContext getSparkContext() { + SparkConf sparkConf = new SparkConf().setAppName("hoodie-delta-streamer-" + cfg.targetTableName); + //sparkConf.setMaster(cfg.sparkMaster); + sparkConf.setMaster("local[2]"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.driver.maxResultSize", "2g"); + + // Configure hadoop conf + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", + "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + sparkConf = HoodieWriteClient.registerClasses(sparkConf); + // register the schemas, so that shuffle does not serialize the full schemas + List schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); + sparkConf.registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); + return new JavaSparkContext(sparkConf); + } + + private void sync() throws Exception { + + + // Retrieve the previous round checkpoints, if any + Optional resumeCheckpointStr = Optional.empty(); + if (commitTimelineOpt.isPresent()) { + Optional lastCommit = commitTimelineOpt.get().lastInstant(); + if (lastCommit.isPresent()) { + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get()); + if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { + resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); + } else { + throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table " + + "was indeed built via delta streamer "); + } + } + } else { + Properties properties = new Properties(); + properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName); + HoodieTableMetaClient.initializePathAsHoodieDataset(FSUtils.getFs(), cfg.targetBasePath, properties); + } + log.info("Checkpoint to resume from : " + resumeCheckpointStr); + + + // Pull the data from the source & prepare the write + Pair>, String> dataAndCheckpoint = source.fetchNewData(resumeCheckpointStr, cfg.maxInputBytes); + + if (!dataAndCheckpoint.getKey().isPresent()) { + log.info("No new data, nothing to commit.. "); + return; + } + + JavaRDD avroRDD = dataAndCheckpoint.getKey().get(); + JavaRDD records = avroRDD + .map(gr -> { + HoodieRecordPayload payload = UtilHelpers.createPayload( + cfg.payloadClassName, + gr, + (Comparable) gr.get(cfg.sourceOrderingField)); + return new HoodieRecord<>(keyGenerator.getKey(gr), payload); + }); + + + // Perform the write + HoodieWriteConfig hoodieCfg = getHoodieClientConfig(cfg.hoodieClientProps); + HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg); + String commitTime = client.startCommit(); + log.info("Starting commit : " + commitTime); + + JavaRDD writeStatusRDD; + if (cfg.operation == Operation.INSERT) { + writeStatusRDD = client.insert(records, commitTime); + } else if (cfg.operation == Operation.UPSERT) { + writeStatusRDD = client.upsert(records, commitTime); + } else { + throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); + } + + // Simply commit for now. TODO(vc): Support better error handlers later on + HashMap checkpointCommitMetadata = new HashMap<>(); + checkpointCommitMetadata.put(CHECKPOINT_KEY, dataAndCheckpoint.getValue()); + + boolean success = client.commit(commitTime, writeStatusRDD, Optional.of(checkpointCommitMetadata)); + if (success) { + log.info("Commit " + commitTime + " successful!"); + // TODO(vc): Kick off hive sync from here. + + } else { + log.info("Commit " + commitTime + " failed!"); + } + client.close(); + } + + private HoodieWriteConfig getHoodieClientConfig(String hoodieClientCfgPath) throws Exception { + // TODO(vc): Double check all the options can be passed in like this. CompactionConfig, IndexConfig everything. + return HoodieWriteConfig.newBuilder() + .combineInput(true, true) + .withPath(cfg.targetBasePath) + .withAutoCommit(false) + .withSchema(schemaProvider.getTargetSchema().toString()) + .forTable(cfg.targetTableName) + .withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .fromInputStream(fs.open(new Path(hoodieClientCfgPath))) + .build(); + } + + private enum Operation { + UPSERT, + INSERT + } + + private class OperationConvertor implements IStringConverter { + @Override + public Operation convert(String value) throws ParameterException { + return Operation.valueOf(value); + } + } + + private class SourceFormatConvertor implements IStringConverter { + @Override + public SourceDataFormat convert(String value) throws ParameterException { + return SourceDataFormat.valueOf(value); + } + } + + public static class Config implements Serializable { + + /** TARGET CONFIGS **/ + @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset", required = true) + public String targetBasePath; + + // TODO: How to obtain hive configs to register? + @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) + public String targetTableName; + + @Parameter(names = {"--hoodie-client-config"}, description = "path to properties file on localfs or dfs, with hoodie client config. Sane defaults" + + "are used, but recommend use to provide basic things like metrics endpoints, hive configs etc") + public String hoodieClientProps = null; + + /** SOURCE CONFIGS **/ + @Parameter(names = {"--source-class"}, description = "subclass of com.uber.hoodie.utilities.sources.Source to use to read data. " + + "built-in options: com.uber.hoodie.utilities.common.{DFSSource (default), KafkaSource, HiveIncrPullSource}") + public String sourceClassName = DFSSource.class.getName(); + + @Parameter(names = {"--source-config"}, description = "path to properties file on localfs or dfs, with source configs. " + + "For list of acceptable properties, refer the source class", required = true) + public String sourceConfigProps = null; + + @Parameter(names = {"--source-format"}, description = "Format of data in source, JSON (default), Avro. All source data is " + + "converted to Avro using the provided schema in any case", converter = SourceFormatConvertor.class) + public SourceDataFormat sourceFormat = SourceDataFormat.JSON; + + @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between " + + " records with same key in input data. Default: 'ts' holding unix timestamp of record") + public String sourceOrderingField = "ts"; + + @Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.utilities.common.KeyExtractor to generate" + + "a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (Uses provided field names as recordkey & partitionpath. " + + "Nested fields specified via dot notation, e.g: a.b.c)") + public String keyGeneratorClass = SimpleKeyGenerator.class.getName(); + + @Parameter(names = {"--key-generator-config"}, description = "Path to properties file on localfs or dfs, with KeyGenerator configs. " + + "For list of acceptable properites, refer the KeyGenerator class", required = true) + public String keyGeneratorProps = null; + + @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. " + + "Default: SourceWrapperPayload. Implement your own, if you want to do something other than overwriting existing value") + public String payloadClassName = DeltaStreamerAvroPayload.class.getName(); + + @Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema.SchemaProvider " + + "to attach schemas to input & target table data, built in options: FilebasedSchemaProvider") + public String schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + + @Parameter(names = {"--schemaprovider-config"}, description = "path to properties file on localfs or dfs, with schema configs. " + + "For list of acceptable properties, refer the schema provider class", required = true) + public String schemaProviderConfigProps = null; + + + /** Other configs **/ + @Parameter(names = {"--max-input-bytes"}, description = "Maximum number of bytes to read from source. Default: 1TB") + public long maxInputBytes = 1L * 1024 * 1024 * 1024 * 1024; + + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + + "is purely new data/inserts to gain speed)", converter = OperationConvertor.class) + public Operation operation = Operation.UPSERT; + + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + } + + public static void main(String[] args) throws Exception { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, args); + // TODO(vc): Do proper validation + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + new HoodieDeltaStreamer(cfg).sync(); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieDeltaStreamerException.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieDeltaStreamerException.java new file mode 100644 index 000000000..c99197b8a --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieDeltaStreamerException.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.exception; + +import com.uber.hoodie.exception.HoodieException; + +public class HoodieDeltaStreamerException extends HoodieException { + public HoodieDeltaStreamerException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieDeltaStreamerException(String msg) { + super(msg); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullException.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullException.java index 5689715f5..a939d8cc6 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullException.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullException.java @@ -1,17 +1,19 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package com.uber.hoodie.utilities.exception; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullSQLException.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullSQLException.java index cae2b4956..3089631b7 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullSQLException.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/exception/HoodieIncrementalPullSQLException.java @@ -1,17 +1,19 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package com.uber.hoodie.utilities.exception; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java new file mode 100644 index 000000000..e21251c8b --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.keygen; + +import com.uber.hoodie.common.model.HoodieKey; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; + +import java.io.Serializable; + +/** + * Abstract class to extend for plugging in extraction of {@link com.uber.hoodie.common.model.HoodieKey} + * from an Avro record + */ +public abstract class KeyGenerator implements Serializable { + + protected transient PropertiesConfiguration config; + + protected KeyGenerator(PropertiesConfiguration config) { + this.config = config; + } + + /** + * Generate a Hoodie Key out of provided generic record. + * + * @param record + * @return + */ + public abstract HoodieKey getKey(GenericRecord record); +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java new file mode 100644 index 000000000..30d0bcedb --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.keygen; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.utilities.UtilHelpers; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; + +import java.util.Arrays; + +/** + * Simple key generator, which takes names of fields to be used for recordKey and partitionPath + * as configs. + */ +public class SimpleKeyGenerator extends KeyGenerator { + + protected final String recordKeyField; + + protected final String partitionPathField; + + /** + * Supported configs + */ + static class Config { + private static final String RECORD_KEY_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.recordkey.field"; + private static final String PARTITION_PATH_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.partitionpath.field"; + } + + public SimpleKeyGenerator(PropertiesConfiguration config) { + super(config); + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.PARTITION_PATH_FIELD_PROP, Config.RECORD_KEY_FIELD_PROP)); + this.recordKeyField = config.getString(Config.RECORD_KEY_FIELD_PROP); + this.partitionPathField = config.getString(Config.PARTITION_PATH_FIELD_PROP); + } + + @Override + public HoodieKey getKey(GenericRecord record) { + return new HoodieKey(record.get(recordKeyField).toString(), record.get(partitionPathField).toString()); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java new file mode 100644 index 000000000..34be8d142 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.keygen; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; + +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +/** + * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. + * + */ +public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { + + enum TimestampType implements Serializable { + UNIX_TIMESTAMP, + DATE_STRING, + MIXED + } + + private final TimestampType timestampType; + + private SimpleDateFormat inputDateFormat; + + private final String outputDateFormat; + + + /** + * Supported configs + */ + static class Config { + // One value from TimestampType above + private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; + private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat"; + private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.dateformat"; + } + + public TimestampBasedKeyGenerator(PropertiesConfiguration config) { + super(config); + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); + this.outputDateFormat = config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + + if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + } + } + + @Override + public HoodieKey getKey(GenericRecord record) { + Object partitionVal = record.get(partitionPathField); + SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat); + partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + + try { + long unixTime; + if (partitionVal instanceof Double) { + unixTime = ((Double) partitionVal).longValue(); + } else if (partitionVal instanceof Float) { + unixTime = ((Float) partitionVal).longValue(); + } else if (partitionVal instanceof Long) { + unixTime = (Long) partitionVal; + } else if (partitionVal instanceof String) { + unixTime = inputDateFormat.parse(partitionVal.toString()).getTime(); + } else { + throw new HoodieNotSupportedException("Unexpected type for partition field: "+ partitionVal.getClass().getName()); + } + + return new HoodieKey(record.get(recordKeyField).toString(), + partitionPathFormat.format(new Date(unixTime * 1000))); + } catch (ParseException pe) { + throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); + } + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java new file mode 100644 index 000000000..dc09a703a --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.schema; + +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.utilities.UtilHelpers; + +import org.apache.avro.Schema; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A simple schema provider, that reads off files on DFS + */ +public class FilebasedSchemaProvider extends SchemaProvider { + + /** + * Configs supported + */ + static class Config { + private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased.schemaprovider.source.schema.file"; + private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased.schemaprovider.target.schema.file"; + } + + private final FileSystem fs; + + private final Schema sourceSchema; + + private final Schema targetSchema; + + public FilebasedSchemaProvider(PropertiesConfiguration config) { + super(config); + this.fs = FSUtils.getFs(); + + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP)); + try { + this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); + this.targetSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.TARGET_SCHEMA_FILE_PROP)))); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading schema", ioe); + } + } + + @Override + public Schema getSourceSchema() { + return sourceSchema; + } + + @Override + public Schema getTargetSchema() { + return targetSchema; + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java new file mode 100644 index 000000000..b3f385bf9 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.commons.configuration.PropertiesConfiguration; +import java.io.Serializable; + +/** + * Class to provide schema for reading data and also writing into a Hoodie table + */ +public abstract class SchemaProvider implements Serializable { + + protected PropertiesConfiguration config; + + protected SchemaProvider(PropertiesConfiguration config) { + this.config = config; + } + + public abstract Schema getSourceSchema(); + + public abstract Schema getTargetSchema(); +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java new file mode 100644 index 000000000..a2c1db220 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.twitter.bijection.Injection; +import com.twitter.bijection.avro.GenericAvroCodecs; +import com.uber.hoodie.avro.MercifulJsonConverter; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Convert a variety of {@link SourceDataFormat} into Avro GenericRecords. Has a bunch of lazy fields + * to circumvent issues around serializing these objects from driver to executors + */ +public class AvroConvertor implements Serializable { + + /** + * To be lazily inited on executors + */ + private transient Schema schema; + + private final String schemaStr; + + /** + * To be lazily inited on executors + */ + private transient MercifulJsonConverter jsonConverter; + + + /** + * To be lazily inited on executors + */ + private transient Injection recordInjection; + + + public AvroConvertor(String schemaStr) { + this.schemaStr = schemaStr; + } + + + private void initSchema() { + if (schema == null) { + Schema.Parser parser = new Schema.Parser(); + schema = parser.parse(schemaStr); + } + } + + private void initInjection() { + if (recordInjection == null) { + recordInjection = GenericAvroCodecs.toBinary(schema); + } + } + + private void initJsonConvertor() { + if (jsonConverter == null) { + jsonConverter = new MercifulJsonConverter(schema); + } + } + + + public GenericRecord fromJson(String json) throws IOException { + initSchema(); + initJsonConvertor(); + return jsonConverter.convert(json); + } + + + public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException { + initSchema(); + initInjection(); + return recordInjection.invert(avroBinary).get(); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java new file mode 100644 index 000000000..9440dffd2 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.schema.SchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.NullWritable; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Source to read data from a given DFS directory structure, incrementally + */ +public class DFSSource extends Source { + + /** + * Configs supported + */ + static class Config { + private final static String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root"; + } + + private final static List IGNORE_FILEPREFIX_LIST = Arrays.asList(".", "_"); + + private final transient FileSystem fs; + + public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { + super(config, sparkContext, dataFormat, schemaProvider); + this.fs = FSUtils.getFs(); + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + } + + + public static JavaRDD fromAvroFiles(final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) { + JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, + AvroKeyInputFormat.class, + AvroKey.class, + NullWritable.class, + sparkContext.hadoopConfiguration()); + return avroRDD.keys().map(r -> ((GenericRecord) r.datum())); + } + + public static JavaRDD fromJsonFiles(final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) { + return sparkContext.textFile(pathStr).map((String j) -> { + return convertor.fromJson(j); + }); + } + + public static JavaRDD fromFiles(SourceDataFormat dataFormat, final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) { + if (dataFormat == SourceDataFormat.AVRO) { + return DFSSource.fromAvroFiles(convertor, pathStr, sparkContext); + } else if (dataFormat == SourceDataFormat.JSON) { + return DFSSource.fromJsonFiles(convertor, pathStr, sparkContext); + } else { + throw new HoodieNotSupportedException("Unsupported data format :" + dataFormat); + } + } + + + @Override + public Pair>, String> fetchNewData(Optional lastCheckpointStr, long maxInputBytes) { + + try { + // obtain all eligible files under root folder. + List eligibleFiles = new ArrayList<>(); + RemoteIterator fitr = fs.listFiles(new Path(config.getString(Config.ROOT_INPUT_PATH_PROP)), true); + while (fitr.hasNext()) { + LocatedFileStatus fileStatus = fitr.next(); + if (fileStatus.isDirectory() || + IGNORE_FILEPREFIX_LIST.stream().filter(pfx -> fileStatus.getPath().getName().startsWith(pfx)).count() > 0) { + continue; + } + eligibleFiles.add(fileStatus); + } + // sort them by modification time. + eligibleFiles.sort((FileStatus f1, FileStatus f2) -> Long.valueOf(f1.getModificationTime()).compareTo(Long.valueOf(f2.getModificationTime()))); + + // Filter based on checkpoint & input size, if needed + long currentBytes = 0; + long maxModificationTime = Long.MIN_VALUE; + List filteredFiles = new ArrayList<>(); + for (FileStatus f : eligibleFiles) { + if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get())) { + // skip processed files + continue; + } + + maxModificationTime = f.getModificationTime(); + currentBytes += f.getLen(); + filteredFiles.add(f); + if (currentBytes >= maxInputBytes) { + // we have enough data, we are done + break; + } + } + + // no data to read + if (filteredFiles.size() == 0) { + return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : String.valueOf(Long.MIN_VALUE)); + } + + // read the files out. + String pathStr = filteredFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); + String schemaStr = schemaProvider.getSourceSchema().toString(); + final AvroConvertor avroConvertor = new AvroConvertor(schemaStr); + + return new ImmutablePair<>(Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)), + String.valueOf(maxModificationTime)); + } catch (IOException ioe) { + throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe); + } + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java new file mode 100644 index 000000000..6ebc1103e --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.schema.SchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Source to read deltas produced by {@link com.uber.hoodie.utilities.HiveIncrementalPuller}, commit + * by commit and apply to the target table + * + * The general idea here is to have commits sync across the data pipeline. + * + * [Source Tables(s)] ====> HiveIncrementalScanner ==> incrPullRootPath ==> targetTable + * {c1,c2,c3,...} {c1,c2,c3,...} {c1,c2,c3,...} + * + * This produces beautiful causality, that makes data issues in ETLs very easy to debug + * + */ +public class HiveIncrPullSource extends Source { + + private static volatile Logger log = LogManager.getLogger(HiveIncrPullSource.class); + + private final transient FileSystem fs; + + private final String incrPullRootPath; + + + /** + * Configs supported + */ + static class Config { + private final static String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.incrpull.root"; + } + + public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { + super(config, sparkContext, dataFormat, schemaProvider); + this.fs = FSUtils.getFs(); + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP); + } + + /** + * Finds the first commit from source, greater than the target's last commit, and reads it out. + */ + private Optional findCommitToPull(Optional latestTargetCommit) throws IOException { + + log.info("Looking for commits "); + + FileStatus[] commitTimePaths = fs.listStatus(new Path(incrPullRootPath)); + List commitTimes = new ArrayList<>(commitTimePaths.length); + for (FileStatus commitTimePath : commitTimePaths) { + String[] splits = commitTimePath.getPath().toString().split("/"); + commitTimes.add(splits[splits.length - 1]); + } + Collections.sort(commitTimes); + log.info("Retrieved commit times " + commitTimes); + + if (!latestTargetCommit.isPresent()) { + // start from the beginning + return Optional.of(commitTimes.get(0)); + } + + for (String commitTime : commitTimes) { + //TODO(vc): Add an option to delete consumed commits + if (commitTime.compareTo(latestTargetCommit.get()) > 0) { + return Optional.of(commitTime); + } + } + return Optional.empty(); + } + + @Override + public Pair>, String> fetchNewData(Optional lastCheckpointStr, long maxInputBytes) { + try { + // find the source commit to pull + Optional commitToPull = findCommitToPull(lastCheckpointStr); + + if (!commitToPull.isPresent()) { + return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); + } + + // read the files out. + List commitDeltaFiles = Arrays.asList(fs.listStatus(new Path(incrPullRootPath, commitToPull.get()))); + String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()).collect(Collectors.joining(",")); + String schemaStr = schemaProvider.getSourceSchema().toString(); + final AvroConvertor avroConvertor = new AvroConvertor(schemaStr); + return new ImmutablePair<>(Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)), + String.valueOf(commitToPull.get())); + } catch (IOException ioe) { + throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe); + } + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java new file mode 100644 index 000000000..7b88ce891 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.exception.HoodieNotSupportedException; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; +import com.uber.hoodie.utilities.schema.SchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.kafka.KafkaCluster; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.kafka.OffsetRange; +import kafka.common.TopicAndPartition; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import kafka.serializer.DefaultDecoder; +import scala.Predef; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Map; +import scala.collection.immutable.Set; +import scala.collection.mutable.ArrayBuffer; +import scala.collection.mutable.StringBuilder; +import scala.util.Either; + + +/** + * Source to read data from Kafka, incrementally + */ +public class KafkaSource extends Source { + + private static volatile Logger log = LogManager.getLogger(KafkaSource.class); + + + static class CheckpointUtils { + + /** + * Reconstruct checkpoint from string. + * + * @param checkpointStr + * @return + */ + public static HashMap strToOffsets(String checkpointStr) { + HashMap offsetMap = new HashMap<>(); + String[] splits = checkpointStr.split(","); + String topic = splits[0]; + for (int i = 1; i < splits.length; i++) { + String[] subSplits = splits[i].split(":"); + offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])), + new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1]))); + } + return offsetMap; + } + + /** + * String representation of checkpoint + * + * Format: + * topic1,0:offset0,1:offset1,2:offset2, ..... + * + * @param offsetMap + * @return + */ + public static String offsetsToStr(HashMap offsetMap) { + StringBuilder sb = new StringBuilder(); + // atleast 1 partition will be present. + sb.append(offsetMap.entrySet().stream().findFirst().get().getKey().topic() + ","); + sb.append(offsetMap.entrySet().stream() + .map(e -> String.format("%s:%d",e.getKey().partition(), e.getValue().offset())) + .collect(Collectors.joining(","))); + return sb.toString(); + } + + public static OffsetRange[] computeOffsetRanges(HashMap fromOffsetMap, + HashMap toOffsetMap) { + Comparator byPartition = (OffsetRange o1, OffsetRange o2) -> { + return Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition())); + }; + List offsetRanges = toOffsetMap.entrySet().stream().map(e -> { + TopicAndPartition tp = e.getKey(); + long fromOffset = -1; + if (fromOffsetMap.containsKey(tp)){ + fromOffset = fromOffsetMap.get(tp).offset(); + } + return OffsetRange.create(tp, fromOffset, e.getValue().offset()); + }).sorted(byPartition).collect(Collectors.toList()); + + OffsetRange[] ranges = new OffsetRange[offsetRanges.size()]; + return offsetRanges.toArray(ranges); + } + + public static long totalNewMessages(OffsetRange[] ranges) { + long totalMsgs = 0; + for (OffsetRange range: ranges) { + totalMsgs += Math.max(range.untilOffset()-range.fromOffset(), 0); + } + return totalMsgs; + } + } + + /** + * Helpers to deal with tricky scala <=> java conversions. (oh my!) + */ + static class ScalaHelpers { + public static Map toScalaMap(HashMap m) { + return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap( + Predef.>conforms() + ); + } + + public static Set toScalaSet(HashSet s) { + return JavaConverters.asScalaSetConverter(s).asScala().toSet(); + } + + public static java.util.Map toJavaMap(Map m) { + return JavaConverters.mapAsJavaMapConverter(m).asJava(); + } + } + + + /** + * Configs to be passed for this source. All standard Kafka consumer configs are also + * respected + */ + static class Config { + private final static String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; + private final static String DEFAULT_AUTO_RESET_OFFSET = "largest"; + } + + + private HashMap kafkaParams; + + private final String topicName; + + public KafkaSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { + super(config, sparkContext, dataFormat, schemaProvider); + + kafkaParams = new HashMap<>(); + Stream keys = StreamSupport.stream(Spliterators.spliteratorUnknownSize(config.getKeys(), Spliterator.NONNULL), false); + keys.forEach(k -> kafkaParams.put(k, config.getString(k))); + + UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME)); + topicName = config.getString(Config.KAFKA_TOPIC_NAME); + } + + @Override + public Pair>, String> fetchNewData(Optional lastCheckpointStr, long maxInputBytes) { + + // Obtain current metadata for the topic + KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams)); + Either, Set> either = cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Arrays.asList(topicName)))); + if (either.isLeft()) { + // log errors. and bail out. + throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head()); + } + Set topicPartitions = either.right().get(); + + // Determine the offset ranges to read from + HashMap fromOffsets; + if (lastCheckpointStr.isPresent()) { + fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + } else { + String autoResetValue = config.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET); + if (autoResetValue.equals("smallest")) { + fromOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + } else if (autoResetValue.equals("largest")) { + fromOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); + } else { + throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' "); + } + } + + // Always read until the latest offset + HashMap toOffsets = new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); + + + // Come up with final set of OffsetRanges to read (account for new partitions) + // TODO(vc): Respect maxInputBytes, by estimating number of messages to read each batch from partition size + OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets); + long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + if (totalNewMsgs <= 0) { + return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : CheckpointUtils.offsetsToStr(toOffsets)); + } else { + log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName); + } + + + // Perform the actual read from Kafka + JavaRDD kafkaRDD = KafkaUtils.createRDD( + sparkContext, + byte[].class, + byte[].class, + DefaultDecoder.class, + DefaultDecoder.class, + kafkaParams, + offsetRanges).values(); + + // Produce a RDD[GenericRecord] + final AvroConvertor avroConvertor = new AvroConvertor(schemaProvider.getSourceSchema().toString()); + JavaRDD newDataRDD; + if (dataFormat == SourceDataFormat.AVRO) { + newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromAvroBinary(bytes)); + } else if (dataFormat == SourceDataFormat.JSON) { + newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromJson(new String(bytes, Charset.forName("utf-8")))); + } else { + throw new HoodieNotSupportedException("Unsupport data format :" + dataFormat); + } + + return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(toOffsets)); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java new file mode 100644 index 000000000..b44ca614f --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.utilities.schema.SchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.util.Optional; + +/** + * Represents a source from which we can tail data. Assumes a constructor that takes properties. + */ +public abstract class Source implements Serializable { + + protected transient PropertiesConfiguration config; + + protected transient JavaSparkContext sparkContext; + + protected transient SourceDataFormat dataFormat; + + protected transient SchemaProvider schemaProvider; + + + protected Source(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { + this.config = config; + this.sparkContext = sparkContext; + this.dataFormat = dataFormat; + this.schemaProvider = schemaProvider; + } + + /** + * Fetches new data upto maxInputBytes, from the provided checkpoint and returns an RDD of the data, + * as well as the checkpoint to be written as a result of that. + * + * @param lastCheckpointStr + * @param maxInputBytes + * @return + */ + public abstract Pair>, String> fetchNewData(Optional lastCheckpointStr, + long maxInputBytes); + + + public PropertiesConfiguration getConfig() { + return config; + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java new file mode 100644 index 000000000..229d7ff39 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +/** + * Format of the data within source. + */ +public enum SourceDataFormat { + AVRO, // No conversion needed explicitly to avro + JSON, // we will try to convert to avro + ROW, // Will be added later, so we can plug/play with spark sources. + CUSTOM // the source is responsible for conversion to avro. +} diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties new file mode 100644 index 000000000..0a578f439 --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties @@ -0,0 +1,19 @@ +# +# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# + +hoodie.upsert.shuffle.parallelism=2 diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties new file mode 100644 index 000000000..9f0beccd3 --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties @@ -0,0 +1,20 @@ +# +# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# + +hoodie.deltastreamer.keygen.simple.recordkey.field=_row_key +hoodie.deltastreamer.keygen.simple.partitionpath.field=driver diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties new file mode 100644 index 000000000..187cd1193 --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties @@ -0,0 +1,20 @@ +# +# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + + +hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc +hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc b/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc new file mode 100644 index 000000000..11a5649d0 --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc @@ -0,0 +1,30 @@ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "fare", + "type" : "double" + } ] +} + diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/source.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/source.properties new file mode 100644 index 000000000..85489c5ec --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/source.properties @@ -0,0 +1,25 @@ +# +# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# + +# DFS Source +hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input + +# Kafka Source +hoodie.deltastreamer.source.kafka.topic=uber_trips +metadata.broker.list=localhost:9092 +auto.offset.reset=smallest diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc b/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc new file mode 100644 index 000000000..a2004f535 --- /dev/null +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc @@ -0,0 +1,29 @@ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "fare", + "type" : "double" + } ] +}