From d58ddbd999ab111e4cbb8f2b75efc7bba5160019 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Sat, 4 Aug 2018 03:35:30 -0700 Subject: [PATCH] Reworking the deltastreamer tool - Standardize version of jackson - DFSPropertiesConfiguration replaces usage of commons PropertiesConfiguration - Remove dependency on ConstructorUtils - Throw error if ordering value is not present, during key generation - Switch to shade plugin for hoodie-utilities - Added support for consumption for Confluent avro kafka serdes - Support for Confluent schema registry - KafkaSource now deals with skews nicely, by doing round robin allocation of source limit across partitions - Added support for BULK_INSERT operations as well - Pass in the payload class config properly into HoodieWriteClient - Fix documentation based on new usage - Adding tests on deltastreamer, sources and all new util classes. --- docs/comparison.md | 2 +- docs/incremental_processing.md | 205 +++++++-------- .../commands/HDFSParquetImportCommand.java | 8 +- .../common/HoodieTestDataGenerator.java | 11 +- hoodie-common/pom.xml | 1 + .../util/DFSPropertiesConfiguration.java | 102 ++++++++ .../hoodie/common/util/ReflectionUtils.java | 53 ++-- .../hoodie/common/util/TypedProperties.java | 87 +++++++ .../util/TestDFSPropertiesConfiguration.java | 138 ++++++++++ hoodie-spark/pom.xml | 5 +- .../java/com/uber/hoodie/BaseAvroPayload.java | 4 + .../java/com/uber/hoodie/DataSourceUtils.java | 17 +- .../java/com/uber/hoodie/KeyGenerator.java | 6 +- .../com/uber/hoodie/SimpleKeyGenerator.java | 25 +- .../scala/com/uber/hoodie/DefaultSource.scala | 12 +- .../test/scala/DataSourceDefaultsTest.scala | 32 ++- hoodie-utilities/pom.xml | 122 +++++---- hoodie-utilities/src/assembly/src.xml | 50 ---- .../hoodie/utilities/HDFSParquetImporter.java | 21 +- .../utilities/HiveIncrementalPuller.java | 9 +- .../hoodie/utilities/HoodieCompactor.java | 5 +- .../uber/hoodie/utilities/UtilHelpers.java | 78 +++--- .../deltastreamer/HoodieDeltaStreamer.java | 235 +++++------------- .../keygen/TimestampBasedKeyGenerator.java | 4 +- .../schema/FilebasedSchemaProvider.java | 43 ++-- .../utilities/schema/SchemaProvider.java | 17 +- .../schema/SchemaRegistryProvider.java | 71 ++++++ .../utilities/sources/AvroConvertor.java | 6 +- .../utilities/sources/AvroDFSSource.java | 47 ++++ .../utilities/sources/AvroKafkaSource.java | 47 ++++ .../hoodie/utilities/sources/DFSSource.java | 66 ++--- .../utilities/sources/HiveIncrPullSource.java | 32 +-- ...urceDataFormat.java => JsonDFSSource.java} | 23 +- .../utilities/sources/JsonKafkaSource.java | 45 ++++ .../hoodie/utilities/sources/KafkaSource.java | 164 ++++++------ .../uber/hoodie/utilities/sources/Source.java | 23 +- .../key-generator.properties | 19 -- .../delta-streamer-config/target.avsc | 29 --- .../utilities/TestHoodieDeltaStreamer.java | 162 ++++++++++++ .../hoodie/utilities/UtilitiesTestBase.java | 137 ++++++++++ .../utilities/sources/TestDFSSource.java | 104 ++++++++ .../utilities/sources/TestDataSource.java | 99 ++++++++ .../utilities/sources/TestKafkaSource.java | 186 ++++++++++++++ .../resources/IncrementalPull.sqltemplate | 0 .../delta-streamer-config/base.properties} | 3 + .../dfs-source.properties} | 13 +- .../kafka-source.properties} | 13 +- .../delta-streamer-config/source.avsc | 6 +- pom.xml | 86 ++++++- 49 files changed, 1919 insertions(+), 754 deletions(-) create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/util/DFSPropertiesConfiguration.java create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/util/TypedProperties.java create mode 100644 hoodie-common/src/test/java/com/uber/hoodie/common/util/TestDFSPropertiesConfiguration.java delete mode 100644 hoodie-utilities/src/assembly/src.xml create mode 100644 hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java create mode 100644 hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroDFSSource.java create mode 100644 hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java rename hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/{SourceDataFormat.java => JsonDFSSource.java} (51%) create mode 100644 hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java delete mode 100644 hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties delete mode 100644 hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/UtilitiesTestBase.java create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDFSSource.java create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestKafkaSource.java rename hoodie-utilities/src/{main => test}/resources/IncrementalPull.sqltemplate (100%) rename hoodie-utilities/src/{main/resources/delta-streamer-config/hoodie-client.properties => test/resources/delta-streamer-config/base.properties} (86%) rename hoodie-utilities/src/{main/resources/delta-streamer-config/schema-provider.properties => test/resources/delta-streamer-config/dfs-source.properties} (60%) rename hoodie-utilities/src/{main/resources/delta-streamer-config/source.properties => test/resources/delta-streamer-config/kafka-source.properties} (60%) rename hoodie-utilities/src/{main => test}/resources/delta-streamer-config/source.avsc (86%) diff --git a/docs/comparison.md b/docs/comparison.md index b5aa0d1d0..0120bcb2e 100644 --- a/docs/comparison.md +++ b/docs/comparison.md @@ -47,7 +47,7 @@ just for analytics. Finally, HBase does not support incremental processing primi A popular question, we get is : "How does Hoodie relate to stream processing systems?", which we will try to answer here. Simply put, Hoodie can integrate with batch (`copy-on-write storage`) and streaming (`merge-on-read storage`) jobs of today, to store the computed results in Hadoop. For Spark apps, this can happen via direct integration of Hoodie library with Spark/Spark streaming DAGs. In case of Non-Spark processing systems (eg: Flink, Hive), the processing can be done in the respective systems -and later sent into a Hoodie table via a Kafka topic/HDFS intermediate file. (TODO: Need link to SQLStreamer/DeltaStreamer after reworking). In more conceptual level, data processing +and later sent into a Hoodie table via a Kafka topic/HDFS intermediate file. In more conceptual level, data processing pipelines just consist of three components : `source`, `processing`, `sink`, with users ultimately running queries against the sink to use the results of the pipeline. Hoodie can act as either a source or sink, that stores data on HDFS. Applicability of Hoodie to a given stream processing pipeline ultimately boils down to suitability of Presto/SparkSQL/Hive for your queries. diff --git a/docs/incremental_processing.md b/docs/incremental_processing.md index 753dd2044..3ea9b82de 100644 --- a/docs/incremental_processing.md +++ b/docs/incremental_processing.md @@ -4,22 +4,117 @@ keywords: incremental processing sidebar: mydoc_sidebar permalink: incremental_processing.html toc: false -summary: In this page, we will discuss incremental processing primitives that Hoodie has to offer. +summary: In this page, we will discuss some available tools for ingesting data incrementally & consuming the changes. --- As discussed in the concepts section, the two basic primitives needed for [incrementally processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop), data using Hoodie are `upserts` (to apply changes to a dataset) and `incremental pulls` (to obtain a change stream/log from a dataset). This section discusses a few tools that can be used to achieve these on different contexts. -{% include callout.html content="Instructions are currently only for Copy-on-write storage. When merge-on-read storage is added, these tools would be revised to add that support" type="info" %} +## Ingesting/Writing Data - -## Upserts - -Upserts can be used to apply a delta or an incremental change to a Hoodie dataset. For e.g, the incremental changes could be from a Kafka topic or files uploaded to HDFS or +Following means can be used to apply a delta or an incremental change to a Hoodie dataset. For e.g, the incremental changes could be from a Kafka topic or files uploaded to HDFS or even changes pulled from another Hoodie dataset. -#### Via Spark Job +#### DeltaStreamer Tool + +The `HoodieDeltaStreamer` utility provides the way to achieve all of these, by using the capabilities of `HoodieWriteClient`, and support simply row-row ingestion (no transformations) +from different sources such as DFS or Kafka. + +The tool is a spark job (part of hoodie-utilities), that provides the following functionality + + - Ability to consume new events from Kafka, incremental imports from Sqoop or output of `HiveIncrementalPuller` or files under a folder on HDFS + - Support json, avro or a custom payload types for the incoming data + - New data is written to a Hoodie dataset, with support for checkpointing & schemas and registered onto Hive + +Command line options describe capabilities in more detail (first build hoodie-utilities using `mvn clean package`). + +``` +[hoodie]$ spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer `ls hoodie-utilities/target/hoodie-utilities-*-SNAPSHOT.jar` --help +Usage:
[options] + Options: + --help, -h + + --key-generator-class + Subclass of com.uber.hoodie.KeyGenerator to generate a HoodieKey from + the given avro record. Built in: SimpleKeyGenerator (uses provided field + names as recordkey & partitionpath. Nested fields specified via dot + notation, e.g: a.b.c) + Default: com.uber.hoodie.SimpleKeyGenerator + --op + Takes one of these values : UPSERT (default), INSERT (use when input is + purely new data/inserts to gain speed) + Default: UPSERT + Possible Values: [UPSERT, INSERT, BULK_INSERT] + --payload-class + subclass of HoodieRecordPayload, that works off a GenericRecord. + Implement your own, if you want to do something other than overwriting + existing value + Default: com.uber.hoodie.OverwriteWithLatestAvroPayload + --props + path to properties file on localfs or dfs, with configurations for + hoodie client, schema provider, key generator and data source. For + hoodie client props, sane defaults are used, but recommend use to + provide basic things like metrics endpoints, hive configs etc. For + sources, referto individual classes, for supported properties. + Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties + --schemaprovider-class + subclass of com.uber.hoodie.utilities.schema.SchemaProvider to attach + schemas to input & target table data, built in options: + FilebasedSchemaProvider + Default: com.uber.hoodie.utilities.schema.FilebasedSchemaProvider + --source-class + Subclass of com.uber.hoodie.utilities.sources to read data. Built-in + options: com.uber.hoodie.utilities.sources.{JsonDFSSource (default), + AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource} + Default: com.uber.hoodie.utilities.sources.JsonDFSSource + --source-limit + Maximum amount of data to read from source. Default: No limit For e.g: + DFSSource => max bytes to read, KafkaSource => max events to read + Default: 9223372036854775807 + --source-ordering-field + Field within source record to decide how to break ties between records + with same key in input data. Default: 'ts' holding unix timestamp of + record + Default: ts + --spark-master + spark master to use. + Default: local[2] + * --target-base-path + base path for the target hoodie dataset. (Will be created if did not + exist first time around. If exists, expected to be a hoodie dataset) + * --target-table + name of the target table in Hive + + +``` + + +The tool takes a hierarchically composed property file and has pluggable interfaces for extracting data, key generation and providing schema. Sample configs for ingesting from kafka and dfs are +provided under `hoodie-utilities/src/test/resources/delta-streamer-config`. + +For e.g: once you have Confluent Kafka, Schema registry up & running, produce some test data using ([impressions.avro](https://docs.confluent.io/current/ksql/docs/tutorials/generate-custom-test-data.html) provided by schema-registry repo) + +``` +[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid +``` + +and then ingest it as follows. + +``` +[hoodie]$ spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer `ls hoodie-utilities/target/hoodie-utilities-*-SNAPSHOT.jar` \ + --props file://${PWD}/hoodie-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \ + --schemaprovider-class com.uber.hoodie.utilities.schema.SchemaRegistryProvider \ + --source-class com.uber.hoodie.utilities.sources.AvroKafkaSource \ + --source-ordering-field impresssiontime \ + --target-base-path file:///tmp/hoodie-deltastreamer-op --target-table uber.impressions \ + --op BULK_INSERT +``` + +In some cases, you may want to convert your existing dataset into Hoodie, before you can begin ingesting new data. This can be accomplished using the `hdfsparquetimport` command on the `hoodie-cli`. +Currently, there is support for converting parquet datasets. + +#### Via Custom Spark Job The `hoodie-spark` module offers the DataSource API to write any data frame into a Hoodie dataset. Following is how we can upsert a dataframe, while specifying the field names that need to be used for `recordKey => _row_key`, `partitionPath => partition` and `precombineKey => timestamp` @@ -39,100 +134,6 @@ inputDF.write() Please refer to [configurations](configurations.html) section, to view all datasource options. - -#### DeltaStreamer Tool - -The `HoodieDeltaStreamer` utility provides the way to achieve all of these, by using the capabilities of `HoodieWriteClient`. - -The tool is a spark job (part of hoodie-utilities), that provides the following functionality - - - Ability to consume new events from Kafka, incremental imports from Sqoop or output of `HiveIncrementalPuller` or files under a folder on HDFS - - Support json, avro or a custom payload types for the incoming data - - New data is written to a Hoodie dataset, with support for checkpointing & schemas and registered onto Hive - - To understand more - -``` - -[hoodie]$ spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer hoodie-utilities/target/hoodie-utilities-0.3.6-SNAPSHOT-bin.jar --help -Usage:
[options] - Options: - --help, -h - Default: false - --hoodie-client-config - path to properties file on localfs or dfs, with hoodie client config. - Sane defaultsare used, but recommend use to provide basic things like metrics - endpoints, hive configs etc - --key-generator-class - Subclass of com.uber.hoodie.utilities.common.KeyExtractor to generatea - HoodieKey from the given avro record. Built in: SimpleKeyGenerator (Uses provided - field names as recordkey & partitionpath. Nested fields specified via dot - notation, e.g: a.b.c) - Default: com.uber.hoodie.utilities.keygen.SimpleKeyGenerator - --key-generator-config - Path to properties file on localfs or dfs, with KeyGenerator configs. For - list of acceptable properites, refer the KeyGenerator class - --max-input-bytes - Maximum number of bytes to read from source. Default: 1TB - Default: 1099511627776 - --op - Takes one of these values : UPSERT (default), INSERT (use when input is - purely new data/inserts to gain speed) - Default: UPSERT - Possible Values: [UPSERT, INSERT] - --payload-class - subclass of HoodieRecordPayload, that works off a GenericRecord. Default: - SourceWrapperPayload. Implement your own, if you want to do something other than overwriting - existing value - Default: com.uber.hoodie.utilities.deltastreamer.DeltaStreamerAvroPayload - --schemaprovider-class - subclass of com.uber.hoodie.utilities.schema.SchemaProvider to attach - schemas to input & target table data, built in options: FilebasedSchemaProvider - Default: com.uber.hoodie.utilities.schema.FilebasedSchemaProvider - --schemaprovider-config - path to properties file on localfs or dfs, with schema configs. For list - of acceptable properties, refer the schema provider class - --source-class - subclass of com.uber.hoodie.utilities.sources.Source to use to read data. - built-in options: com.uber.hoodie.utilities.common.{DFSSource (default), - KafkaSource, HiveIncrPullSource} - Default: com.uber.hoodie.utilities.sources.DFSSource - --source-config - path to properties file on localfs or dfs, with source configs. For list - of acceptable properties, refer the source class - --source-format - Format of data in source, JSON (default), Avro. All source data is - converted to Avro using the provided schema in any case - Default: JSON - Possible Values: [AVRO, JSON, ROW, CUSTOM] - --source-ordering-field - Field within source record to decide how to break ties between records - with same key in input data. Default: 'ts' holding unix timestamp of record - Default: ts - --target-base-path - base path for the target hoodie dataset - --target-table - name of the target table in Hive - - -``` - -For e.g, followings ingests data from Kafka (avro records as the client example) - - -``` -[hoodie]$ spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer hoodie-utilities/target/hoodie-utilities-0.3.6-SNAPSHOT-bin.jar \ - --hoodie-client-config hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties \ - --key-generator-config hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties \ - --schemaprovider-config hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties \ - --source-class com.uber.hoodie.utilities.sources.KafkaSource \ - --source-config hoodie-utilities/src/main/resources/delta-streamer-config/source.properties \ - --source-ordering-field rider \ - --target-base-path file:///tmp/hoodie-deltastreamer-op \ - --target-table uber.trips -``` - - #### Syncing to Hive Once new data is written to a Hoodie dataset, via tools like above, we need the ability to sync with Hive and reflect the table schema such that queries can pick up new columns and partitions. To do this, Hoodie provides a `HiveSyncTool`, which can be @@ -164,7 +165,7 @@ Usage:
[options] {% include callout.html content="Note that for now, due to jar mismatches between Spark & Hive, its recommended to run this as a separate Java task in your workflow manager/cron. This is getting fix [here](https://github.com/uber/hoodie/issues/123)" type="info" %} -## Incremental Pull +## Incrementally Pulling Hoodie datasets can be pulled incrementally, which means you can get ALL and ONLY the updated & new rows since a specified commit timestamp. This, together with upserts, are particularly useful for building data pipelines where 1 or more source hoodie tables are incrementally pulled (streams/facts), diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java index 6750bafe9..76a19fa70 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java @@ -41,7 +41,6 @@ public class HDFSParquetImportCommand implements CommandMarker { @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert, @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath, - @CliOption(key = "srcType", mandatory = true, help = "Source type for the input dataset") final String srcType, @CliOption(key = "targetPath", mandatory = true, help = "Base path for the target hoodie dataset") final String targetPath, @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, @@ -57,7 +56,7 @@ public class HDFSParquetImportCommand implements CommandMarker { @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception { - validate(format, srcType); + (new HDFSParquetImporter.FormatValidator()).validate("format", format); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -81,9 +80,4 @@ public class HDFSParquetImportCommand implements CommandMarker { } return "Dataset imported to hoodie format"; } - - private void validate(String format, String srcType) { - (new HDFSParquetImporter.FormatValidator()).validate("format", format); - (new HDFSParquetImporter.SourceTypeValidator()).validate("srcType", srcType); - } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index a36faef1c..b056bd805 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -65,7 +65,6 @@ public class HoodieTestDataGenerator { DEFAULT_THIRD_PARTITION_PATH }; public static final int DEFAULT_PARTITION_DEPTH = 3; - public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," @@ -78,6 +77,7 @@ public class HoodieTestDataGenerator { + "{\"name\":\"fare\",\"type\": \"double\"}]}"; public static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); private static Random rand = new Random(46474747); + private List existingKeysList = new ArrayList<>(); private String[] partitionPaths; @@ -162,10 +162,6 @@ public class HoodieTestDataGenerator { } } - public static void createSavepointFile(String basePath, String commitTime) throws IOException { - createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); - } - public static void createSavepointFile(String basePath, String commitTime, Configuration configuration) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME @@ -289,8 +285,11 @@ public class HoodieTestDataGenerator { return partitionPaths; } - static class KeyPartition { + public List getExistingKeysList() { + return existingKeysList; + } + public static class KeyPartition { HoodieKey key; String partitionPath; } diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 1e2f4fec4..071ae7a63 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -88,6 +88,7 @@ com.fasterxml.jackson.core jackson-annotations + ${fasterxml.version} org.apache.parquet diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/DFSPropertiesConfiguration.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/DFSPropertiesConfiguration.java new file mode 100644 index 000000000..758ba0fca --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/DFSPropertiesConfiguration.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.common.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A simplified versions of Apache commons - PropertiesConfiguration, that supports limited field types and hierarchical + * configurations within the same folder as the root file. + * + * Includes denoted by the same include=filename.properties syntax, with relative path from root file's folder. Lines + * beginning with '#' are ignored as comments. Final values for properties are resolved by the order in which they are + * specified in the files, with included files treated as if they are inline. + * + * Note: Not reusing commons-configuration since it has too many conflicting runtime deps. + */ +public class DFSPropertiesConfiguration { + + private static volatile Logger log = LogManager.getLogger(DFSPropertiesConfiguration.class); + + private final FileSystem fs; + + private final Path rootFile; + + private final TypedProperties props; + + // Keep track of files visited, to detect loops + private final Set visitedFiles; + + public DFSPropertiesConfiguration(FileSystem fs, Path rootFile, TypedProperties defaults) { + this.fs = fs; + this.rootFile = rootFile; + this.props = defaults; + this.visitedFiles = new HashSet<>(); + visitFile(rootFile); + } + + private String[] splitProperty(String line) { + int ind = line.indexOf('='); + String k = line.substring(0, ind).trim(); + String v = line.substring(ind + 1).trim(); + return new String[]{k, v}; + } + + private void visitFile(Path file) { + try { + if (visitedFiles.contains(file.getName())) { + throw new IllegalStateException("Loop detected; file " + file + " already referenced"); + } + visitedFiles.add(file.getName()); + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file))); + String line; + while ((line = reader.readLine()) != null) { + if (line.startsWith("#") || line.equals("") || !line.contains("=")) { + continue; + } + String[] split = splitProperty(line); + if (line.startsWith("include=") || line.startsWith("include =")) { + visitFile(new Path(rootFile.getParent(), split[1])); + } else { + props.setProperty(split[0], split[1]); + } + } + reader.close(); + } catch (IOException ioe) { + log.error("Error reading in properies from dfs", ioe); + throw new IllegalArgumentException("Cannot read properties from dfs", ioe); + } + } + + public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) { + this(fs, rootFile, new TypedProperties()); + } + + public TypedProperties getConfig() { + return props; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java index 1d5d3b58f..7416067e0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieException; import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -26,18 +27,22 @@ public class ReflectionUtils { private static Map> clazzCache = new HashMap<>(); + private static Class getClass(String clazzName) { + if (!clazzCache.containsKey(clazzName)) { + try { + Class clazz = Class.forName(clazzName); + clazzCache.put(clazzName, clazz); + } catch (ClassNotFoundException e) { + throw new HoodieException("Unable to load class", e); + } + } + return clazzCache.get(clazzName); + } + public static T loadClass(String fqcn) { try { - if (clazzCache.get(fqcn) == null) { - Class clazz = Class.forName(fqcn); - clazzCache.put(fqcn, clazz); - } - return (T) clazzCache.get(fqcn).newInstance(); - } catch (ClassNotFoundException e) { - throw new HoodieException("Could not load class " + fqcn, e); - } catch (InstantiationException e) { - throw new HoodieException("Could not load class " + fqcn, e); - } catch (IllegalAccessException e) { + return (T) getClass(fqcn).newInstance(); + } catch (InstantiationException | IllegalAccessException e) { throw new HoodieException("Could not load class " + fqcn, e); } } @@ -49,18 +54,32 @@ public class ReflectionUtils { Object[] payloadArgs, Class... constructorArgTypes) { try { - if (clazzCache.get(recordPayloadClass) == null) { - Class clazz = Class.forName(recordPayloadClass); - clazzCache.put(recordPayloadClass, clazz); - } - return (T) clazzCache.get(recordPayloadClass).getConstructor(constructorArgTypes) + return (T) getClass(recordPayloadClass).getConstructor(constructorArgTypes) .newInstance(payloadArgs); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new HoodieException("Unable to instantiate payload class ", e); - } catch (ClassNotFoundException e) { - throw new HoodieException("Unable to instantiate payload class ", e); } } + /** + * Creates an instnace of the given class. Use this version when dealing with interface types as constructor args. + */ + public static Object loadClass(String clazz, Class[] constructorArgTypes, Object... constructorArgs) { + try { + return getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs); + } catch (InstantiationException | IllegalAccessException + | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException("Unable to instantiate class ", e); + } + } + + /** + * Creates an instance of the given class. Constructor arg types are inferred. + */ + public static Object loadClass(String clazz, Object... constructorArgs) { + Class[] constructorArgTypes = Arrays.stream(constructorArgs) + .map(arg -> arg.getClass()).toArray(Class[]::new); + return loadClass(clazz, constructorArgTypes, constructorArgs); + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/TypedProperties.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/TypedProperties.java new file mode 100644 index 000000000..5674d2382 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/TypedProperties.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.common.util; + +import java.io.Serializable; +import java.util.Properties; + +/** + * Type-aware extension of {@link java.util.Properties} + */ +public class TypedProperties extends Properties implements Serializable { + + public TypedProperties() { + super(null); + } + + public TypedProperties(Properties defaults) { + super(defaults); + } + + private void checkKey(String property) { + if (!containsKey(property)) { + throw new IllegalArgumentException("Property " + property + " not found"); + } + } + + public String getString(String property) { + checkKey(property); + return getProperty(property); + } + + public String getString(String property, String defaultValue) { + return containsKey(property) ? getProperty(property) : defaultValue; + } + + public int getInteger(String property) { + checkKey(property); + return Integer.valueOf(getProperty(property)); + } + + public int getInteger(String property, int defaultValue) { + return containsKey(property) ? Integer.valueOf(getProperty(property)) : defaultValue; + } + + public long getLong(String property) { + checkKey(property); + return Long.valueOf(getProperty(property)); + } + + public long getLong(String property, long defaultValue) { + return containsKey(property) ? Long.valueOf(getProperty(property)) : defaultValue; + } + + public boolean getBoolean(String property) { + checkKey(property); + return Boolean.valueOf(getProperty(property)); + } + + public boolean getBoolean(String property, boolean defaultValue) { + return containsKey(property) ? Boolean.valueOf(getProperty(property)) : defaultValue; + } + + public double getDouble(String property) { + checkKey(property); + return Double.valueOf(getProperty(property)); + } + + public double getDouble(String property, double defaultValue) { + return containsKey(property) ? Double.valueOf(getProperty(property)) : defaultValue; + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestDFSPropertiesConfiguration.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestDFSPropertiesConfiguration.java new file mode 100644 index 000000000..1146abfe1 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestDFSPropertiesConfiguration.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.common.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.uber.hoodie.common.minicluster.HdfsTestService; +import java.io.IOException; +import java.io.PrintStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests basic functionality of {@link DFSPropertiesConfiguration} + */ +public class TestDFSPropertiesConfiguration { + + private static String dfsBasePath; + private static HdfsTestService hdfsTestService; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + + + @BeforeClass + public static void initClass() throws Exception { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + + // create some files. + Path filePath = new Path(dfsBasePath + "/t1.props"); + writePropertiesFile(filePath, new String[]{ + "", "#comment", "abc",// to be ignored + "int.prop=123", "double.prop=113.4", "string.prop=str", "boolean.prop=true", "long.prop=1354354354" + }); + + filePath = new Path(dfsBasePath + "/t2.props"); + writePropertiesFile(filePath, new String[]{ + "string.prop=ignored", "include=t1.props" + }); + + filePath = new Path(dfsBasePath + "/t3.props"); + writePropertiesFile(filePath, new String[]{ + "double.prop=838.3", "include = t2.props", "double.prop=243.4", "string.prop=t3.value" + }); + + filePath = new Path(dfsBasePath + "/t4.props"); + writePropertiesFile(filePath, new String[]{ + "double.prop=838.3", "include = t4.props" + }); + } + + @AfterClass + public static void cleanupClass() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + } + } + + private static void writePropertiesFile(Path path, String[] lines) throws IOException { + PrintStream out = new PrintStream(dfs.create(path, true)); + for (String line : lines) { + out.println(line); + } + out.flush(); + out.close(); + } + + @Test + public void testParsing() throws IOException { + DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props")); + TypedProperties props = cfg.getConfig(); + assertEquals(5, props.size()); + try { + props.getString("invalid.key"); + fail("Should error out here."); + } catch (IllegalArgumentException iae) { /* ignore */ } + + assertEquals(123, props.getInteger("int.prop")); + assertEquals(113.4, props.getDouble("double.prop"), 0.001); + assertEquals(true, props.getBoolean("boolean.prop")); + assertEquals("str", props.getString("string.prop")); + assertEquals(1354354354, props.getLong("long.prop")); + + assertEquals(123, props.getInteger("int.prop", 456)); + assertEquals(113.4, props.getDouble("double.prop", 223.4), 0.001); + assertEquals(true, props.getBoolean("boolean.prop", false)); + assertEquals("str", props.getString("string.prop", "default")); + assertEquals(1354354354, props.getLong("long.prop", 8578494434L)); + + assertEquals(456, props.getInteger("bad.int.prop", 456)); + assertEquals(223.4, props.getDouble("bad.double.prop", 223.4), 0.001); + assertEquals(false, props.getBoolean("bad.boolean.prop", false)); + assertEquals("default", props.getString("bad.string.prop", "default")); + assertEquals(8578494434L, props.getLong("bad.long.prop", 8578494434L)); + } + + @Test + public void testIncludes() { + DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props")); + TypedProperties props = cfg.getConfig(); + + assertEquals(123, props.getInteger("int.prop")); + assertEquals(243.4, props.getDouble("double.prop"), 0.001); + assertEquals(true, props.getBoolean("boolean.prop")); + assertEquals("t3.value", props.getString("string.prop")); + assertEquals(1354354354, props.getLong("long.prop")); + + try { + new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t4.props")); + fail("Should error out on a self-included file."); + } catch (IllegalStateException ise) { /* ignore */ } + } +} diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml index 3942dac3e..5ed93b3b0 100644 --- a/hoodie-spark/pom.xml +++ b/hoodie-spark/pom.xml @@ -148,7 +148,10 @@ com.fasterxml.jackson.core jackson-annotations - + + com.fasterxml.jackson.module + jackson-module-scala_2.11 + org.apache.hadoop hadoop-client diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java index 8e452c8dc..191b2d236 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java @@ -18,6 +18,7 @@ package com.uber.hoodie; +import com.uber.hoodie.exception.HoodieException; import java.io.Serializable; import org.apache.avro.generic.GenericRecord; @@ -44,5 +45,8 @@ public abstract class BaseAvroPayload implements Serializable { public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.record = record; this.orderingVal = orderingVal; + if (orderingVal == null) { + throw new HoodieException("Ordering value is null for record: " + record); + } } } diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 19226af91..551a1865d 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -21,6 +21,8 @@ package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; @@ -31,8 +33,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -72,10 +72,9 @@ public class DataSourceUtils { * Create a key generator class via reflection, passing in any configs needed */ public static KeyGenerator createKeyGenerator(String keyGeneratorClass, - PropertiesConfiguration cfg) throws IOException { + TypedProperties props) throws IOException { try { - return (KeyGenerator) ConstructorUtils - .invokeConstructor(Class.forName(keyGeneratorClass), (Object) cfg); + return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); } catch (Throwable e) { throw new IOException("Could not load key generator class " + keyGeneratorClass, e); } @@ -87,17 +86,17 @@ public class DataSourceUtils { public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { try { - return (HoodieRecordPayload) ConstructorUtils.invokeConstructor(Class.forName(payloadClass), - (Object) record, (Object) orderingVal); + return (HoodieRecordPayload) ReflectionUtils + .loadClass(payloadClass, new Class[]{GenericRecord.class, Comparable.class}, record, orderingVal); } catch (Throwable e) { throw new IOException("Could not create payload for class: " + payloadClass, e); } } - public static void checkRequiredProperties(PropertiesConfiguration configuration, + public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { checkPropNames.stream().forEach(prop -> { - if (!configuration.containsKey(prop)) { + if (!props.containsKey(prop)) { throw new HoodieNotSupportedException("Required property " + prop + " is missing"); } }); diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java index 74aa74560..3c6e2d65d 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java @@ -19,9 +19,9 @@ package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.util.TypedProperties; import java.io.Serializable; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; /** * Abstract class to extend for plugging in extraction of @@ -30,9 +30,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; */ public abstract class KeyGenerator implements Serializable { - protected transient PropertiesConfiguration config; + protected transient TypedProperties config; - protected KeyGenerator(PropertiesConfiguration config) { + protected KeyGenerator(TypedProperties config) { this.config = config; } diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java index 6c15ce434..0017f20b3 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java @@ -19,9 +19,9 @@ package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.exception.HoodieException; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; /** * Simple key generator, which takes names of fields to be used for recordKey and partitionPath as @@ -29,14 +29,16 @@ import org.apache.commons.configuration.PropertiesConfiguration; */ public class SimpleKeyGenerator extends KeyGenerator { + private static final String DEFAULT_PARTITION_PATH = "default"; + protected final String recordKeyField; protected final String partitionPathField; - public SimpleKeyGenerator(PropertiesConfiguration config) { - super(config); - this.recordKeyField = config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); - this.partitionPathField = config + public SimpleKeyGenerator(TypedProperties props) { + super(props); + this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); + this.partitionPathField = props .getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()); } @@ -46,7 +48,16 @@ public class SimpleKeyGenerator extends KeyGenerator { throw new HoodieException( "Unable to find field names for record key or partition path in cfg"); } - return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField), - DataSourceUtils.getNestedFieldValAsString(record, partitionPathField)); + + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField); + String partitionPath; + try { + partitionPath = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField); + } catch (HoodieException e) { + // if field is not found, lump it into default partition + partitionPath = DEFAULT_PARTITION_PATH; + } + + return new HoodieKey(recordKey, partitionPath); } } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index dd744da6d..5a94f9e60 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -25,10 +25,10 @@ import java.util.{Optional, Properties} import com.uber.hoodie.DataSourceReadOptions._ import com.uber.hoodie.DataSourceWriteOptions._ import com.uber.hoodie.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import com.uber.hoodie.common.util.TypedProperties import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.exception.HoodieException import org.apache.avro.generic.GenericRecord -import org.apache.commons.configuration.PropertiesConfiguration import org.apache.hadoop.fs.Path import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -121,10 +121,10 @@ class DefaultSource extends RelationProvider mapAsScalaMap(defaultsMap) } - def toPropertiesConfiguration(params: Map[String, String]): PropertiesConfiguration = { - val propCfg = new PropertiesConfiguration() - params.foreach(kv => propCfg.addProperty(kv._1, kv._2)) - propCfg + def toProperties(params: Map[String, String]): TypedProperties = { + val props = new TypedProperties() + params.foreach(kv => props.setProperty(kv._1, kv._2)) + props } @@ -161,7 +161,7 @@ class DefaultSource extends RelationProvider // Convert to RDD[HoodieRecord] val keyGenerator = DataSourceUtils.createKeyGenerator( parameters(KEYGENERATOR_CLASS_OPT_KEY), - toPropertiesConfiguration(parameters) + toProperties(parameters) ) val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) val hoodieRecords = genericRecords.map(gr => { diff --git a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala index a2f82af41..734cc7f53 100644 --- a/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceDefaultsTest.scala @@ -16,11 +16,10 @@ * */ -import com.uber.hoodie.common.util.SchemaTestUtil +import com.uber.hoodie.common.util.{SchemaTestUtil, TypedProperties} import com.uber.hoodie.exception.HoodieException import com.uber.hoodie.{DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} import org.apache.avro.generic.GenericRecord -import org.apache.commons.configuration.PropertiesConfiguration import org.junit.Assert._ import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit @@ -39,10 +38,10 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { } - private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): PropertiesConfiguration = { - val props = new PropertiesConfiguration() - props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName) - props.addProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField) + private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): TypedProperties = { + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName) + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField) props } @@ -52,24 +51,26 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { assertEquals("field1", hk1.getRecordKey) assertEquals("name1", hk1.getPartitionPath) - // recordKey field not specified + // partition path field not specified try { - val props = new PropertiesConfiguration() - props.addProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "field1") new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: IllegalArgumentException => { // do nothing } }; - // partitionPath field is null + // recordkey field not specified try { - new SimpleKeyGenerator(getKeyConfig("field1", null)).getKey(baseRecord) + val props = new TypedProperties() + props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionField") + new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: IllegalArgumentException => { // do nothing } }; @@ -90,6 +91,11 @@ class DataSourceDefaultsTest extends AssertionsForJUnit { // do nothing } }; + + // if partition path can't be found, return default partition path + val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) + .getKey(baseRecord); + assertEquals("default", hk3.getPartitionPath) } @Test def testOverwriteWithLatestAvroPayload() = { diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index 0f28561d7..e8159d540 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -42,27 +42,43 @@ org.apache.maven.plugins - maven-assembly-plugin - 2.4.1 - - - src/assembly/src.xml - - - - com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer - - - - + maven-shade-plugin + 3.1.1 - make-assembly - package - single + shade + + ${project.build.directory}/dependency-reduced-pom.xml + + true + + + com.uber.hoodie:hoodie-common + com.uber.hoodie:hoodie-client + com.uber.hoodie:hoodie-spark + com.uber.hoodie:hoodie-hive + com.uber.hoodie:hoodie-hadoop-mr + com.beust:jcommander + com.twitter:bijection-avro_2.11 + com.twitter:bijection-core_2.11 + org.apache.parquet:parquet-avro + com.twitter:parquet-avro + com.twitter.common:objectsize + io.confluent:kafka-avro-serializer + io.confluent:common-config + io.confluent:common-utils + io.confluent:kafka-schema-registry-client + org.apache.spark:spark-streaming-kafka-0-8_2.11 + org.apache.kafka:kafka_2.11 + com.yammer.metrics:metrics-core + com.101tec:zkclient + org.apache.kafka:kafka-clients + + + @@ -78,10 +94,18 @@ + + + confluent + http://packages.confluent.io/maven/ + + + + - org.apache.spark - spark-sql_2.11 + com.fasterxml.jackson.module + jackson-module-scala_2.11 @@ -108,21 +132,6 @@ org.apache.hadoop hadoop-hdfs tests - - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - org.apache.hadoop @@ -184,6 +193,10 @@ javax.servlet servlet-api + + com.fasterxml.jackson.* + * + @@ -219,23 +232,16 @@ org.apache.hadoop hadoop-client - - - javax.servlet - servlet-api - - - + org.apache.spark spark-core_2.11 - - - javax.servlet - servlet-api - - + + + + org.apache.spark + spark-sql_2.11 @@ -291,6 +297,30 @@ 0.9.2 + + io.confluent + kafka-avro-serializer + 3.0.0 + + + + io.confluent + common-config + 3.0.0 + + + + io.confluent + common-utils + 3.0.0 + + + + io.confluent + kafka-schema-registry-client + 3.0.0 + + diff --git a/hoodie-utilities/src/assembly/src.xml b/hoodie-utilities/src/assembly/src.xml deleted file mode 100644 index aa2fbcd21..000000000 --- a/hoodie-utilities/src/assembly/src.xml +++ /dev/null @@ -1,50 +0,0 @@ - - - - bin - - jar - - - false - - - / - true - runtime - - junit:junit - com.google.code.findbugs:* - org.apache.hadoop:* - org.apache.hbase:* - - - - - - - - - - - - - - - diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java index 7ec989c95..9e6f45512 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -75,7 +75,9 @@ public class HDFSParquetImporter implements Serializable { System.exit(1); } HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); - dataImporter.dataImport(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry); + dataImporter + .dataImport(UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), + cfg.retry); } public int dataImport(JavaSparkContext jsc, int retry) throws Exception { @@ -206,19 +208,6 @@ public class HDFSParquetImporter implements Serializable { } } - public static class SourceTypeValidator implements IValueValidator { - - List validSourceTypes = Arrays.asList("hdfs"); - - @Override - public void validate(String name, String value) throws ParameterException { - if (value == null || !validSourceTypes.contains(value)) { - throw new ParameterException(String.format( - "Invalid source type: value:%s: supported source types:%s", value, validSourceTypes)); - } - } - } - public static class Config implements Serializable { @Parameter(names = {"--command", "-c"}, @@ -228,10 +217,6 @@ public class HDFSParquetImporter implements Serializable { @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true) public String srcPath = null; - @Parameter(names = {"--src-type", - "-st"}, description = "Source type for the input dataset", required = true, - validateValueWith = SourceTypeValidator.class) - public String srcType = null; @Parameter(names = {"--target-path", "-tp"}, description = "Base path for the target hoodie dataset", required = true) public String targetPath = null; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java index 07d3ecbe9..72c50f77a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HiveIncrementalPuller.java @@ -93,8 +93,6 @@ public class HiveIncrementalPuller { public int maxCommits = 3; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; - @Parameter(names = {"--storageFormat"}) - public String tempTableStorageFormat = "AVRO"; } static { @@ -207,12 +205,7 @@ public class HiveIncrementalPuller { } private String getStoredAsClause() { - if (config.tempTableStorageFormat.equalsIgnoreCase("json")) { - // Special case for json - // default json serde does not support having same key even if its under multiple depths - return "ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE"; - } - return "STORED AS " + config.tempTableStorageFormat; + return "STORED AS AVRO"; } private void initHiveBeelineProperties(Statement stmt) throws SQLException { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java index d327e6d0d..8aa596793 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java @@ -80,7 +80,8 @@ public class HoodieCompactor { System.exit(1); } HoodieCompactor compactor = new HoodieCompactor(cfg); - compactor.compact(UtilHelpers.buildSparkContext(cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), cfg.retry); + compactor.compact(UtilHelpers.buildSparkContext("compactor-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory), + cfg.retry); } public int compact(JavaSparkContext jsc, int retry) { @@ -119,4 +120,4 @@ public class HoodieCompactor { client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); return 0; } -} \ No newline at end of file +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index 77952bcf0..7cd040dfe 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -20,22 +20,20 @@ package com.uber.hoodie.utilities; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.util.DFSPropertiesConfiguration; import com.uber.hoodie.common.util.ReflectionUtils; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.Source; -import com.uber.hoodie.utilities.sources.SourceDataFormat; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Optional; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,42 +50,34 @@ import org.apache.spark.api.java.JavaSparkContext; public class UtilHelpers { private static Logger logger = LogManager.getLogger(UtilHelpers.class); - public static Source createSource(String sourceClass, PropertiesConfiguration cfg, - JavaSparkContext jssc, SourceDataFormat dataFormat, SchemaProvider schemaProvider) + public static Source createSource(String sourceClass, TypedProperties cfg, + JavaSparkContext jssc, SchemaProvider schemaProvider) throws IOException { try { - return (Source) ConstructorUtils.invokeConstructor(Class.forName(sourceClass), (Object) cfg, - (Object) jssc, (Object) dataFormat, (Object) schemaProvider); + return (Source) ReflectionUtils.loadClass(sourceClass, + new Class[]{TypedProperties.class, JavaSparkContext.class, SchemaProvider.class}, + cfg, jssc, schemaProvider); } catch (Throwable e) { throw new IOException("Could not load source class " + sourceClass, e); } } public static SchemaProvider createSchemaProvider(String schemaProviderClass, - PropertiesConfiguration cfg) throws IOException { + TypedProperties cfg, JavaSparkContext jssc) throws IOException { try { - return (SchemaProvider) ConstructorUtils.invokeConstructor(Class.forName(schemaProviderClass), - (Object) cfg); + return (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); } catch (Throwable e) { throw new IOException("Could not load schema provider class " + schemaProviderClass, e); } } /** - * TODO: Support hierarchical config files (see CONFIGURATION-609 for sample) */ - public static PropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) { + public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath) { try { - FSDataInputStream in = fs.open(cfgPath); - PropertiesConfiguration config = new PropertiesConfiguration(); - config.load(in); - in.close(); - return config; - } catch (IOException e) { - throw new HoodieIOException("Unable to read config file at :" + cfgPath, e); - } catch (ConfigurationException e) { - throw new HoodieDeltaStreamerException("Invalid configs found in config file at :" + cfgPath, - e); + return new DFSPropertiesConfiguration(fs, cfgPath); + } catch (Exception e) { + throw new HoodieException("Unable to read props file at :" + cfgPath, e); } } @@ -117,24 +107,16 @@ public class UtilHelpers { return new String(buf.array()); } - /** - * Build Spark Context for ingestion/compaction - * @return - */ - public static JavaSparkContext buildSparkContext(String tableName, String sparkMaster, String sparkMemory) { - SparkConf sparkConf = new SparkConf().setAppName("hoodie-data-importer-" + tableName); - sparkConf.setMaster(sparkMaster); - - if (sparkMaster.startsWith("yarn")) { + private static SparkConf buildSparkConf(String appName, String defaultMaster) { + SparkConf sparkConf = new SparkConf().setAppName(appName); + String master = sparkConf.get("spark.master", defaultMaster); + sparkConf.setMaster(master); + if (master.startsWith("yarn")) { sparkConf.set("spark.eventLog.overwrite", "true"); sparkConf.set("spark.eventLog.enabled", "true"); } - sparkConf.set("spark.driver.maxResultSize", "2g"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.executor.memory", sparkMemory); - - // Configure hadoop conf sparkConf.set("spark.hadoop.mapred.output.compress", "true"); sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); sparkConf.set("spark.hadoop.mapred.output.compression.codec", @@ -142,6 +124,20 @@ public class UtilHelpers { sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); sparkConf = HoodieWriteClient.registerClasses(sparkConf); + return sparkConf; + } + + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { + return new JavaSparkContext(buildSparkConf(appName, defaultMaster)); + } + + /** + * Build Spark Context for ingestion/compaction + * @return + */ + public static JavaSparkContext buildSparkContext(String appName, String sparkMaster, String sparkMemory) { + SparkConf sparkConf = buildSparkConf(appName, sparkMaster); + sparkConf.set("spark.executor.memory", sparkMemory); return new JavaSparkContext(sparkConf); } @@ -185,4 +181,10 @@ public class UtilHelpers { logger.error(String.format("Import failed with %d errors.", errors.value())); return -1; } + + public static TypedProperties readConfig(InputStream in) throws IOException { + TypedProperties defaults = new TypedProperties(); + defaults.load(in); + return defaults; + } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 7f58a721e..211a171e0 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -35,6 +35,8 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; @@ -44,9 +46,8 @@ import com.uber.hoodie.utilities.UtilHelpers; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; import com.uber.hoodie.utilities.schema.SchemaProvider; -import com.uber.hoodie.utilities.sources.DFSSource; +import com.uber.hoodie.utilities.sources.JsonDFSSource; import com.uber.hoodie.utilities.sources.Source; -import com.uber.hoodie.utilities.sources.SourceDataFormat; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -56,13 +57,10 @@ import java.util.Optional; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.collection.JavaConversions; @@ -77,7 +75,7 @@ public class HoodieDeltaStreamer implements Serializable { private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class); - private static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; private final Config cfg; @@ -113,9 +111,15 @@ public class HoodieDeltaStreamer implements Serializable { private transient JavaSparkContext jssc; - public HoodieDeltaStreamer(Config cfg) throws IOException { + /** + * Bag of properties with source, hoodie client, key generator etc. + */ + TypedProperties props; + + + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { this.cfg = cfg; - this.jssc = getSparkContext(); + this.jssc = jssc; this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); if (fs.exists(new Path(cfg.targetBasePath))) { @@ -126,61 +130,19 @@ public class HoodieDeltaStreamer implements Serializable { this.commitTimelineOpt = Optional.empty(); } - //TODO(vc) Should these be passed from outside? - initSchemaProvider(); - initKeyGenerator(); - initSource(); - } + this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath)).getConfig(); + log.info("Creating delta streamer with configs : " + props.toString()); + this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); + this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); + this.source = UtilHelpers.createSource(cfg.sourceClassName, props, jssc, schemaProvider); - private void initSource() throws IOException { - // Create the source & schema providers - PropertiesConfiguration sourceCfg = UtilHelpers.readConfig(fs, new Path(cfg.sourceConfigProps)); - log.info("Creating source " + cfg.sourceClassName + " with configs : " + sourceCfg.toString()); - this.source = UtilHelpers.createSource(cfg.sourceClassName, sourceCfg, jssc, cfg.sourceFormat, - schemaProvider); - } - - private void initSchemaProvider() throws IOException { - PropertiesConfiguration schemaCfg = UtilHelpers.readConfig(fs, - new Path(cfg.schemaProviderConfigProps)); - log.info( - "Creating schema provider " + cfg.schemaProviderClassName + " with configs : " + schemaCfg - .toString()); - this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, schemaCfg); - } - - private void initKeyGenerator() throws IOException { - PropertiesConfiguration keygenCfg = UtilHelpers.readConfig(fs, new Path(cfg.keyGeneratorProps)); - log.info("Creating key generator " + cfg.keyGeneratorClass + " with configs : " + keygenCfg - .toString()); - this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg); - } - - - private JavaSparkContext getSparkContext() { - SparkConf sparkConf = new SparkConf() - .setAppName("hoodie-delta-streamer-" + cfg.targetTableName); - //sparkConf.setMaster(cfg.sparkMaster); - sparkConf.setMaster("local[2]"); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.driver.maxResultSize", "2g"); - - // Configure hadoop conf - sparkConf.set("spark.hadoop.mapred.output.compress", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", - "org.apache.hadoop.io.compress.GzipCodec"); - sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - - sparkConf = HoodieWriteClient.registerClasses(sparkConf); // register the schemas, so that shuffle does not serialize the full schemas List schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); - sparkConf.registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); - return new JavaSparkContext(sparkConf); + jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } - private void sync() throws Exception { + public void sync() throws Exception { // Retrieve the previous round checkpoints, if any Optional resumeCheckpointStr = Optional.empty(); if (commitTimelineOpt.isPresent()) { @@ -207,7 +169,7 @@ public class HoodieDeltaStreamer implements Serializable { // Pull the data from the source & prepare the write Pair>, String> dataAndCheckpoint = source.fetchNewData( - resumeCheckpointStr, cfg.maxInputBytes); + resumeCheckpointStr, cfg.sourceLimit); if (!dataAndCheckpoint.getKey().isPresent()) { log.info("No new data, nothing to commit.. "); @@ -222,7 +184,7 @@ public class HoodieDeltaStreamer implements Serializable { }); // Perform the write - HoodieWriteConfig hoodieCfg = getHoodieClientConfig(cfg.hoodieClientProps); + HoodieWriteConfig hoodieCfg = getHoodieClientConfig(); HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg); String commitTime = client.startCommit(); log.info("Starting commit : " + commitTime); @@ -232,6 +194,8 @@ public class HoodieDeltaStreamer implements Serializable { writeStatusRDD = client.insert(records, commitTime); } else if (cfg.operation == Operation.UPSERT) { writeStatusRDD = client.upsert(records, commitTime); + } else if (cfg.operation == Operation.BULK_INSERT) { + writeStatusRDD = client.bulkInsert(records, commitTime); } else { throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); } @@ -245,157 +209,84 @@ public class HoodieDeltaStreamer implements Serializable { if (success) { log.info("Commit " + commitTime + " successful!"); // TODO(vc): Kick off hive sync from here. - } else { log.info("Commit " + commitTime + " failed!"); } client.close(); } - private HoodieWriteConfig getHoodieClientConfig(String hoodieClientCfgPath) throws Exception { + private HoodieWriteConfig getHoodieClientConfig() throws Exception { return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) - .withAutoCommit(false).withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withPayloadClass( - OverwriteWithLatestAvroPayload - .class - .getName()).build()) + .withAutoCommit(false) .withSchema(schemaProvider.getTargetSchema().toString()) - .forTable(cfg.targetTableName).withIndexConfig( - HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .fromInputStream(fs.open(new Path(hoodieClientCfgPath))).build(); + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build()) + .forTable(cfg.targetTableName) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withProps(props).build(); } - private enum Operation { - UPSERT, INSERT + public enum Operation { + UPSERT, INSERT, BULK_INSERT } private class OperationConvertor implements IStringConverter { - @Override public Operation convert(String value) throws ParameterException { return Operation.valueOf(value); } } - private class SourceFormatConvertor implements IStringConverter { - - @Override - public SourceDataFormat convert(String value) throws ParameterException { - return SourceDataFormat.valueOf(value); - } - } - public static class Config implements Serializable { - /** - * TARGET CONFIGS - **/ - @Parameter(names = { - "--target-base-path"}, description = "base path for the target hoodie dataset", required = true) + @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset. " + + "(Will be created if did not exist first time around. If exists, expected to be a hoodie dataset)", + required = true) public String targetBasePath; // TODO: How to obtain hive configs to register? - @Parameter(names = { - "--target-table"}, description = "name of the target table in Hive", required = true) + @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) public String targetTableName; - @Parameter(names = {"--hoodie-client-config"}, description = - "path to properties file on localfs or " - + "dfs, with hoodie client config. " - + "Sane defaults" - + "are used, but recommend use to " - + "provide basic things like metrics " - + "endpoints, hive configs etc") - public String hoodieClientProps = null; + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + + "to individual classes, for supported properties.") + public String propsFilePath = + "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; - /** - * SOURCE CONFIGS - **/ - @Parameter(names = {"--source-class"}, description = - "subclass of com.uber.hoodie.utilities.sources" - + ".Source to use to read data. " - + "built-in options: com.uber.hoodie.utilities" - + ".common.{DFSSource (default), KafkaSource, " - + "HiveIncrPullSource}") - public String sourceClassName = DFSSource.class.getName(); + @Parameter(names = {"--source-class"}, description = "Subclass of com.uber.hoodie.utilities.sources to read data. " + + "Built-in options: com.uber.hoodie.utilities.sources.{JsonDFSSource (default), AvroDFSSource, " + + "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}") + public String sourceClassName = JsonDFSSource.class.getName(); - @Parameter(names = {"--source-config"}, description = - "path to properties file on localfs or dfs, with " - + "source configs. " - + "For list of acceptable properties, refer " - + "the source class", required = true) - public String sourceConfigProps = null; - - @Parameter(names = {"--source-format"}, description = - "Format of data in source, JSON (default), Avro. " - + "All source data is " - + "converted to Avro using the provided " - + "schema in any case", converter = SourceFormatConvertor.class) - public SourceDataFormat sourceFormat = SourceDataFormat.JSON; - - @Parameter(names = {"--source-ordering-field"}, description = - "Field within source record to decide how" - + " to break ties between " - + " records with same key in input " - + "data. Default: 'ts' holding unix " - + "timestamp of record") + @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how" + + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") public String sourceOrderingField = "ts"; - @Parameter(names = {"--key-generator-class"}, description = - "Subclass of com.uber.hoodie.utilities" - + ".common.KeyExtractor to generate" - + "a HoodieKey from the given avro " - + "record. Built in: SimpleKeyGenerator" - + " (Uses provided field names as " - + "recordkey & partitionpath. " - + "Nested fields specified via dot " - + "notation, e.g: a.b.c)") + @Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.KeyGenerator " + + "to generate a HoodieKey from the given avro record. Built in: SimpleKeyGenerator (uses " + + "provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)") public String keyGeneratorClass = SimpleKeyGenerator.class.getName(); - @Parameter(names = {"--key-generator-config"}, description = - "Path to properties file on localfs or " - + "dfs, with KeyGenerator configs. " - + "For list of acceptable properites, " - + "refer the KeyGenerator class", - required = true) - public String keyGeneratorProps = null; - - @Parameter(names = {"--payload-class"}, description = - "subclass of HoodieRecordPayload, that works off " - + "a GenericRecord. " - + "Default: SourceWrapperPayload. Implement " - + "your own, if you want to do something " - + "other than overwriting existing value") + @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off " + + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); - @Parameter(names = {"--schemaprovider-class"}, description = - "subclass of com.uber.hoodie.utilities" - + ".schema.SchemaProvider " - + "to attach schemas to input & target" - + " table data, built in options: " - + "FilebasedSchemaProvider") + @Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema" + + ".SchemaProvider to attach schemas to input & target table data, built in options: FilebasedSchemaProvider") public String schemaProviderClassName = FilebasedSchemaProvider.class.getName(); - @Parameter(names = {"--schemaprovider-config"}, description = - "path to properties file on localfs or dfs, with schema " - + "configs. For list of acceptable properties, refer " - + "the schema provider class", required = true) - public String schemaProviderConfigProps = null; + @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. " + + "Default: No limit For e.g: DFSSource => max bytes to read, KafkaSource => max events to read") + public long sourceLimit = Long.MAX_VALUE; - - /** - * Other configs - **/ - @Parameter(names = { - "--max-input-bytes"}, description = "Maximum number of bytes to read from source. Default: 1TB") - public long maxInputBytes = 1L * 1024 * 1024 * 1024 * 1024; - - @Parameter(names = {"--op"}, description = - "Takes one of these values : UPSERT (default), INSERT (use when input " - + "is purely new data/inserts to gain speed)", + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + + "is purely new data/inserts to gain speed)", converter = OperationConvertor.class) public Operation operation = Operation.UPSERT; + @Parameter(names = {"--spark-master"}, description = "spark master to use.") + public String sparkMaster = "local[2]"; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -408,6 +299,8 @@ public class HoodieDeltaStreamer implements Serializable { cmd.usage(); System.exit(1); } - new HoodieDeltaStreamer(cfg).sync(); + + JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster); + new HoodieDeltaStreamer(cfg, jssc).sync(); } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java index a0c8e4a62..3228fdeb5 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java @@ -21,6 +21,7 @@ package com.uber.hoodie.utilities.keygen; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import java.io.Serializable; @@ -30,7 +31,6 @@ import java.util.Arrays; import java.util.Date; import java.util.TimeZone; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; /** * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. @@ -64,7 +64,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { + ".dateformat"; } - public TimestampBasedKeyGenerator(PropertiesConfiguration config) { + public TimestampBasedKeyGenerator(TypedProperties config) { super(config); DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java index aa38bf8b9..d4df829c4 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java @@ -20,14 +20,14 @@ package com.uber.hoodie.utilities.schema; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.util.Arrays; import org.apache.avro.Schema; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; /** * A simple schema provider, that reads off files on DFS @@ -37,33 +37,30 @@ public class FilebasedSchemaProvider extends SchemaProvider { /** * Configs supported */ - static class Config { - - private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased" - + ".schemaprovider.source.schema" - + ".file"; - private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.filebased" - + ".schemaprovider.target.schema" - + ".file"; + public static class Config { + private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + + ".source.schema.file"; + private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + + ".target.schema.file"; } private final FileSystem fs; private final Schema sourceSchema; - private final Schema targetSchema; + private Schema targetSchema; - public FilebasedSchemaProvider(PropertiesConfiguration config) { - super(config); - this.fs = FSUtils.getFs(config.getBasePath(), new Configuration()); - - DataSourceUtils.checkRequiredProperties(config, - Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP)); + public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP)); + this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), jssc.hadoopConfiguration()); try { this.sourceSchema = new Schema.Parser().parse( - fs.open(new Path(config.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); - this.targetSchema = new Schema.Parser().parse( - fs.open(new Path(config.getString(Config.TARGET_SCHEMA_FILE_PROP)))); + fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); + if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) { + this.targetSchema = new Schema.Parser().parse( + fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP)))); + } } catch (IOException ioe) { throw new HoodieIOException("Error reading schema", ioe); } @@ -76,6 +73,10 @@ public class FilebasedSchemaProvider extends SchemaProvider { @Override public Schema getTargetSchema() { - return targetSchema; + if (targetSchema != null) { + return targetSchema; + } else { + return super.getTargetSchema(); + } } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java index 3a192581c..8202660d7 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaProvider.java @@ -18,22 +18,29 @@ package com.uber.hoodie.utilities.schema; +import com.uber.hoodie.common.util.TypedProperties; import java.io.Serializable; import org.apache.avro.Schema; -import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.spark.api.java.JavaSparkContext; /** * Class to provide schema for reading data and also writing into a Hoodie table */ public abstract class SchemaProvider implements Serializable { - protected PropertiesConfiguration config; + protected TypedProperties config; - protected SchemaProvider(PropertiesConfiguration config) { - this.config = config; + protected JavaSparkContext jssc; + + protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) { + this.config = props; + this.jssc = jssc; } public abstract Schema getSourceSchema(); - public abstract Schema getTargetSchema(); + public Schema getTargetSchema() { + // by default, use source schema as target for hoodie dataset as well + return getSourceSchema(); + } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java new file mode 100644 index 000000000..e77a67bed --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.schema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.uber.hoodie.DataSourceUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Obtains latest schema from the Confluent/Kafka schema-registry + * + * https://github.com/confluentinc/schema-registry + */ +public class SchemaRegistryProvider extends SchemaProvider { + + /** + * Configs supported + */ + public static class Config { + + private static final String SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; + } + + private final Schema schema; + + private String fetchSchemaFromRegistry(String registryUrl) throws IOException { + URL registry = new URL(registryUrl); + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(registry.openStream()); + return node.get("schema").asText(); + } + + public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SCHEMA_REGISTRY_URL_PROP)); + String registryUrl = props.getString(Config.SCHEMA_REGISTRY_URL_PROP); + try { + this.schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading schema from registry :" + registryUrl, ioe); + } + } + + @Override + public Schema getSourceSchema() { + return schema; + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java index 083ecb7d3..e8abfba79 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java @@ -27,7 +27,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; /** - * Convert a variety of {@link SourceDataFormat} into Avro GenericRecords. Has a bunch of lazy + * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy * fields to circumvent issues around serializing these objects from driver to executors */ public class AvroConvertor implements Serializable { @@ -82,6 +82,10 @@ public class AvroConvertor implements Serializable { return jsonConverter.convert(json); } + public Schema getSchema() { + return new Schema.Parser().parse(schemaStr); + } + public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException { initSchema(); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroDFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroDFSSource.java new file mode 100644 index 000000000..335d06a9b --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroDFSSource.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.hadoop.io.NullWritable; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * DFS Source that reads avro data + */ +public class AvroDFSSource extends DFSSource { + + public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + } + + @Override + protected JavaRDD fromFiles(AvroConvertor convertor, String pathStr) { + JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, + AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, + sparkContext.hadoopConfiguration()); + return avroRDD.keys().map(r -> ((GenericRecord) r.datum())); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java new file mode 100644 index 000000000..a4d01d1b9 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import io.confluent.kafka.serializers.KafkaAvroDecoder; +import kafka.serializer.StringDecoder; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.kafka.OffsetRange; + +/** + * Reads avro serialized Kafka data, based on the confluent schema-registry + */ +public class AvroKafkaSource extends KafkaSource { + + public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + } + + @Override + protected JavaRDD toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) { + JavaRDD recordRDD = KafkaUtils + .createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, kafkaParams, + offsetRanges).values().map(obj -> (GenericRecord) obj); + return recordRDD; + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java index a6bd623f6..2d4d07851 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java @@ -20,8 +20,10 @@ package com.uber.hoodie.utilities.sources; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.ImmutablePair; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.exception.HoodieIOException; -import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.utilities.schema.SchemaProvider; import java.io.IOException; import java.util.ArrayList; @@ -30,31 +32,23 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.io.NullWritable; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** * Source to read data from a given DFS directory structure, incrementally */ -public class DFSSource extends Source { +public abstract class DFSSource extends Source { /** * Configs supported */ static class Config { - private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root"; } @@ -62,50 +56,23 @@ public class DFSSource extends Source { private final transient FileSystem fs; - public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext, - SourceDataFormat dataFormat, SchemaProvider schemaProvider) { - super(config, sparkContext, dataFormat, schemaProvider); - this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration()); - DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); - } - - - public static JavaRDD fromAvroFiles(final AvroConvertor convertor, String pathStr, - JavaSparkContext sparkContext) { - JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, - AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, - sparkContext.hadoopConfiguration()); - return avroRDD.keys().map(r -> ((GenericRecord) r.datum())); - } - - public static JavaRDD fromJsonFiles(final AvroConvertor convertor, String pathStr, - JavaSparkContext sparkContext) { - return sparkContext.textFile(pathStr).map((String j) -> { - return convertor.fromJson(j); - }); - } - - public static JavaRDD fromFiles(SourceDataFormat dataFormat, - final AvroConvertor convertor, String pathStr, JavaSparkContext sparkContext) { - if (dataFormat == SourceDataFormat.AVRO) { - return DFSSource.fromAvroFiles(convertor, pathStr, sparkContext); - } else if (dataFormat == SourceDataFormat.JSON) { - return DFSSource.fromJsonFiles(convertor, pathStr, sparkContext); - } else { - throw new HoodieNotSupportedException("Unsupported data format :" + dataFormat); - } + public DFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), sparkContext.hadoopConfiguration()); } + protected abstract JavaRDD fromFiles(final AvroConvertor convertor, String pathStr); @Override public Pair>, String> fetchNewData( - Optional lastCheckpointStr, long maxInputBytes) { + Optional lastCheckpointStr, long sourceLimit) { try { // obtain all eligible files under root folder. List eligibleFiles = new ArrayList<>(); RemoteIterator fitr = fs.listFiles( - new Path(config.getString(Config.ROOT_INPUT_PATH_PROP)), true); + new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true); while (fitr.hasNext()) { LocatedFileStatus fileStatus = fitr.next(); if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream().filter( @@ -130,13 +97,14 @@ public class DFSSource extends Source { continue; } - maxModificationTime = f.getModificationTime(); - currentBytes += f.getLen(); - filteredFiles.add(f); - if (currentBytes >= maxInputBytes) { + if (currentBytes + f.getLen() >= sourceLimit) { // we have enough data, we are done break; } + + maxModificationTime = f.getModificationTime(); + currentBytes += f.getLen(); + filteredFiles.add(f); } // no data to read @@ -153,7 +121,7 @@ public class DFSSource extends Source { final AvroConvertor avroConvertor = new AvroConvertor(schemaStr); return new ImmutablePair<>( - Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)), + Optional.of(fromFiles(avroConvertor, pathStr)), String.valueOf(maxModificationTime)); } catch (IOException ioe) { throw new HoodieIOException( diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java index 2b4de59ee..faf6d0966 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java @@ -20,6 +20,9 @@ package com.uber.hoodie.utilities.sources; import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.ImmutablePair; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.utilities.schema.SchemaProvider; import java.io.IOException; @@ -30,14 +33,15 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -69,12 +73,12 @@ public class HiveIncrPullSource extends Source { private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.incrpull.root"; } - public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext, - SourceDataFormat dataFormat, SchemaProvider schemaProvider) { - super(config, sparkContext, dataFormat, schemaProvider); - this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration()); - DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); - this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP); + public HiveIncrPullSource(TypedProperties props, JavaSparkContext sparkContext, + SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + this.incrPullRootPath = props.getString(Config.ROOT_INPUT_PATH_PROP); + this.fs = FSUtils.getFs(incrPullRootPath, sparkContext.hadoopConfiguration()); } /** @@ -110,7 +114,7 @@ public class HiveIncrPullSource extends Source { @Override public Pair>, String> fetchNewData( - Optional lastCheckpointStr, long maxInputBytes) { + Optional lastCheckpointStr, long sourceLimit) { try { // find the source commit to pull Optional commitToPull = findCommitToPull(lastCheckpointStr); @@ -125,10 +129,10 @@ public class HiveIncrPullSource extends Source { fs.listStatus(new Path(incrPullRootPath, commitToPull.get()))); String pathStr = commitDeltaFiles.stream().map(f -> f.getPath().toString()) .collect(Collectors.joining(",")); - String schemaStr = schemaProvider.getSourceSchema().toString(); - final AvroConvertor avroConvertor = new AvroConvertor(schemaStr); - return new ImmutablePair<>( - Optional.of(DFSSource.fromFiles(dataFormat, avroConvertor, pathStr, sparkContext)), + JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, + AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, + sparkContext.hadoopConfiguration()); + return new ImmutablePair<>(Optional.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))), String.valueOf(commitToPull.get())); } catch (IOException ioe) { throw new HoodieIOException( diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java similarity index 51% rename from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java rename to hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java index 12596a7cd..3167dbe59 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceDataFormat.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java @@ -18,12 +18,23 @@ package com.uber.hoodie.utilities.sources; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + /** - * Format of the data within source. + * DFS Source that reads json data */ -public enum SourceDataFormat { - AVRO, // No conversion needed explicitly to avro - JSON, // we will try to convert to avro - ROW, // Will be added later, so we can plug/play with spark sources. - CUSTOM // the source is responsible for conversion to avro. +public class JsonDFSSource extends DFSSource { + + public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + } + + @Override + protected JavaRDD fromFiles(AvroConvertor convertor, String pathStr) { + return sparkContext.textFile(pathStr).map((String j) -> convertor.fromJson(j)); + } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java new file mode 100644 index 000000000..6dd111654 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import kafka.serializer.StringDecoder; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.kafka.OffsetRange; + +/** + * Read json kafka data + */ +public class JsonKafkaSource extends KafkaSource { + + public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(properties, sparkContext, schemaProvider); + } + + @Override + protected JavaRDD toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) { + return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class, + kafkaParams, offsetRanges) + .values().map(jsonStr -> avroConvertor.fromJson(jsonStr)); + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java index f470848f1..5e0328016 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java @@ -19,33 +19,26 @@ package com.uber.hoodie.utilities.sources; import com.uber.hoodie.DataSourceUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.ImmutablePair; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.SchemaProvider; -import java.nio.charset.Charset; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Optional; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; import kafka.common.TopicAndPartition; -import kafka.serializer.DefaultDecoder; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.kafka.KafkaCluster; -import org.apache.spark.streaming.kafka.KafkaUtils; +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset; import org.apache.spark.streaming.kafka.OffsetRange; import scala.Predef; import scala.collection.JavaConverters; @@ -59,10 +52,12 @@ import scala.util.Either; /** * Source to read data from Kafka, incrementally */ -public class KafkaSource extends Source { +public abstract class KafkaSource extends Source { private static volatile Logger log = LogManager.getLogger(KafkaSource.class); + private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max + static class CheckpointUtils { @@ -72,6 +67,9 @@ public class KafkaSource extends Source { public static HashMap strToOffsets( String checkpointStr) { HashMap offsetMap = new HashMap<>(); + if (checkpointStr.length() == 0) { + return offsetMap; + } String[] splits = checkpointStr.split(","); String topic = splits[0]; for (int i = 1; i < splits.length; i++) { @@ -83,46 +81,70 @@ public class KafkaSource extends Source { } /** - * String representation of checkpoint - *

- * Format: topic1,0:offset0,1:offset1,2:offset2, ..... + * String representation of checkpoint

Format: topic1,0:offset0,1:offset1,2:offset2, ..... */ - public static String offsetsToStr( - HashMap offsetMap) { + public static String offsetsToStr(OffsetRange[] ranges) { StringBuilder sb = new StringBuilder(); // atleast 1 partition will be present. - sb.append(offsetMap.entrySet().stream().findFirst().get().getKey().topic() + ","); - sb.append(offsetMap.entrySet().stream() - .map(e -> String.format("%s:%d", e.getKey().partition(), e.getValue().offset())) + sb.append(ranges[0].topic() + ","); + sb.append(Arrays.stream(ranges) + .map(r -> String.format("%s:%d", r.partition(), r.untilOffset())) .collect(Collectors.joining(","))); return sb.toString(); } - public static OffsetRange[] computeOffsetRanges( - HashMap fromOffsetMap, - HashMap toOffsetMap) { - Comparator byPartition = (OffsetRange o1, OffsetRange o2) -> { - return Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition())); - }; - List offsetRanges = toOffsetMap.entrySet().stream().map(e -> { - TopicAndPartition tp = e.getKey(); - long fromOffset = -1; - if (fromOffsetMap.containsKey(tp)) { - fromOffset = fromOffsetMap.get(tp).offset(); - } - return OffsetRange.create(tp, fromOffset, e.getValue().offset()); - }).sorted(byPartition).collect(Collectors.toList()); - OffsetRange[] ranges = new OffsetRange[offsetRanges.size()]; - return offsetRanges.toArray(ranges); + /** + * Compute the offset ranges to read from Kafka, while handling newly added partitions, skews, event limits. + * + * @param fromOffsetMap offsets where we left off last time + * @param toOffsetMap offsets of where each partitions is currently at + * @param numEvents maximum number of events to read. + */ + public static OffsetRange[] computeOffsetRanges( + HashMap fromOffsetMap, + HashMap toOffsetMap, + long numEvents) { + + Comparator byPartition = (OffsetRange o1, OffsetRange o2) -> + Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition())); + + // Create initial offset ranges for each 'to' partition, with from = to offsets. + OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; + toOffsetMap.entrySet().stream().map(e -> { + TopicAndPartition tp = e.getKey(); + long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset(); + return OffsetRange.create(tp, fromOffset, fromOffset); + }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); + + long allocedEvents = 0; + java.util.Set exhaustedPartitions = new HashSet<>(); + // keep going until we have events to allocate and partitions still not exhausted. + while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { + long remainingEvents = numEvents - allocedEvents; + long eventsPerPartition = (long) Math + .ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size())); + + // Allocate the remaining events to non-exhausted partitions, in round robin fashion + for (int i = 0; i < ranges.length; i++) { + OffsetRange range = ranges[i]; + if (!exhaustedPartitions.contains(range.partition())) { + long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset(); + long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition); + if (toOffset == toOffsetMax) { + exhaustedPartitions.add(range.partition()); + } + allocedEvents += toOffset - range.untilOffset(); + ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset); + } + } + } + + return ranges; } public static long totalNewMessages(OffsetRange[] ranges) { - long totalMsgs = 0; - for (OffsetRange range : ranges) { - totalMsgs += Math.max(range.untilOffset() - range.fromOffset(), 0); - } - return totalMsgs; + return Arrays.asList(ranges).stream().mapToLong(r -> r.count()).sum(); } } @@ -149,32 +171,31 @@ public class KafkaSource extends Source { * Configs to be passed for this source. All standard Kafka consumer configs are also respected */ static class Config { - private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String DEFAULT_AUTO_RESET_OFFSET = "largest"; } - private HashMap kafkaParams; + protected HashMap kafkaParams; - private final String topicName; + protected final String topicName; - public KafkaSource(PropertiesConfiguration config, JavaSparkContext sparkContext, - SourceDataFormat dataFormat, SchemaProvider schemaProvider) { - super(config, sparkContext, dataFormat, schemaProvider); + public KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); kafkaParams = new HashMap<>(); - Stream keys = StreamSupport.stream( - Spliterators.spliteratorUnknownSize(config.getKeys(), Spliterator.NONNULL), false); - keys.forEach(k -> kafkaParams.put(k, config.getString(k))); - - DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME)); - topicName = config.getString(Config.KAFKA_TOPIC_NAME); + for (Object prop : props.keySet()) { + kafkaParams.put(prop.toString(), props.getString(prop.toString())); + } + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.KAFKA_TOPIC_NAME)); + topicName = props.getString(Config.KAFKA_TOPIC_NAME); } + protected abstract JavaRDD toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor); + @Override public Pair>, String> fetchNewData( - Optional lastCheckpointStr, long maxInputBytes) { + Optional lastCheckpointStr, long sourceLimit) { // Obtain current metadata for the topic KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams)); @@ -192,7 +213,7 @@ public class KafkaSource extends Source { if (lastCheckpointStr.isPresent()) { fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); } else { - String autoResetValue = config + String autoResetValue = props .getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET); if (autoResetValue.equals("smallest")) { fromOffsets = new HashMap(ScalaHelpers.toJavaMap( @@ -206,40 +227,23 @@ public class KafkaSource extends Source { } } - // Always read until the latest offset + // Obtain the latest offsets. HashMap toOffsets = new HashMap( ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); - // Come up with final set of OffsetRanges to read (account for new partitions) - // TODO(vc): Respect maxInputBytes, by estimating number of messages to read each batch from - // partition size - OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets); + // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) + long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit); + OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); if (totalNewMsgs <= 0) { - return new ImmutablePair<>(Optional.empty(), - lastCheckpointStr.isPresent() ? lastCheckpointStr.get() - : CheckpointUtils.offsetsToStr(toOffsets)); + return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); } else { log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName); } - // Perform the actual read from Kafka - JavaRDD kafkaRDD = KafkaUtils.createRDD(sparkContext, byte[].class, byte[].class, - DefaultDecoder.class, DefaultDecoder.class, kafkaParams, offsetRanges).values(); - // Produce a RDD[GenericRecord] - final AvroConvertor avroConvertor = new AvroConvertor( - schemaProvider.getSourceSchema().toString()); - JavaRDD newDataRDD; - if (dataFormat == SourceDataFormat.AVRO) { - newDataRDD = kafkaRDD.map(bytes -> avroConvertor.fromAvroBinary(bytes)); - } else if (dataFormat == SourceDataFormat.JSON) { - newDataRDD = kafkaRDD.map( - bytes -> avroConvertor.fromJson(new String(bytes, Charset.forName("utf-8")))); - } else { - throw new HoodieNotSupportedException("Unsupport data format :" + dataFormat); - } - - return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(toOffsets)); + final AvroConvertor avroConvertor = new AvroConvertor(schemaProvider.getSourceSchema().toString()); + JavaRDD newDataRDD = toAvroRDD(offsetRanges, avroConvertor); + return new ImmutablePair<>(Optional.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java index fe7a805d4..06b83e9b1 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/Source.java @@ -18,12 +18,12 @@ package com.uber.hoodie.utilities.sources; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.utilities.schema.SchemaProvider; import java.io.Serializable; import java.util.Optional; import org.apache.avro.generic.GenericRecord; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang3.tuple.Pair; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -32,32 +32,23 @@ import org.apache.spark.api.java.JavaSparkContext; */ public abstract class Source implements Serializable { - protected transient PropertiesConfiguration config; + protected transient TypedProperties props; protected transient JavaSparkContext sparkContext; - protected transient SourceDataFormat dataFormat; - protected transient SchemaProvider schemaProvider; - protected Source(PropertiesConfiguration config, JavaSparkContext sparkContext, - SourceDataFormat dataFormat, SchemaProvider schemaProvider) { - this.config = config; + protected Source(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + this.props = props; this.sparkContext = sparkContext; - this.dataFormat = dataFormat; this.schemaProvider = schemaProvider; } /** - * Fetches new data upto maxInputBytes, from the provided checkpoint and returns an RDD of the + * Fetches new data upto sourceLimit, from the provided checkpoint and returns an RDD of the * data, as well as the checkpoint to be written as a result of that. */ public abstract Pair>, String> fetchNewData( - Optional lastCheckpointStr, long maxInputBytes); - - - public PropertiesConfiguration getConfig() { - return config; - } + Optional lastCheckpointStr, long sourceLimit); } diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties deleted file mode 100644 index c75201780..000000000 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties +++ /dev/null @@ -1,19 +0,0 @@ -# -# Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -hoodie.datasource.write.recordkey.field=_row_key -hoodie.datasource.write.partitionpath.field=driver diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc b/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc deleted file mode 100644 index a2004f535..000000000 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc +++ /dev/null @@ -1,29 +0,0 @@ -{ - "type" : "record", - "name" : "triprec", - "fields" : [ { - "name" : "_row_key", - "type" : "string" - }, { - "name" : "rider", - "type" : "string" - }, { - "name" : "driver", - "type" : "string" - }, { - "name" : "begin_lat", - "type" : "double" - }, { - "name" : "begin_lon", - "type" : "double" - }, { - "name" : "end_lat", - "type" : "double" - }, { - "name" : "end_lon", - "type" : "double" - }, { - "name" : "fare", - "type" : "double" - } ] -} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java new file mode 100644 index 000000000..d2ff8ace1 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.DFSPropertiesConfiguration; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.exception.DatasetNotFoundException; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation; +import com.uber.hoodie.utilities.sources.TestDataSource; +import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.SQLContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Basic tests against {@link com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer}, by issuing bulk_inserts, + * upserts, inserts. Check counts at the end. + */ +public class TestHoodieDeltaStreamer extends UtilitiesTestBase { + + private static volatile Logger log = LogManager.getLogger(TestHoodieDeltaStreamer.class); + + @BeforeClass + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + + // prepare the configs. + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + TypedProperties props = new TypedProperties(); + props.setProperty("include", "base.properties"); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source.properties"); + } + + @AfterClass + public static void cleanupClass() throws Exception { + UtilitiesTestBase.cleanupClass(); + } + + @Before + public void setup() throws Exception { + super.setup(); + TestDataSource.initDataGen(); + } + + @After + public void teardown() throws Exception { + super.teardown(); + TestDataSource.resetDataGen(); + } + + static class TestHelpers { + + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetBasePath = basePath; + cfg.targetTableName = "hoodie_trips"; + cfg.sourceClassName = TestDataSource.class.getName(); + cfg.operation = op; + cfg.sourceOrderingField = "timestamp"; + cfg.propsFilePath = dfsBasePath + "/test-source.properties"; + cfg.sourceLimit = 1000; + return cfg; + } + + static void assertRecordCount(long expected, String datasetPath, SQLContext sqlContext) { + long recordCount = sqlContext.read().format("com.uber.hoodie").load(datasetPath).count(); + assertEquals(expected, recordCount); + } + + static void assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits) + throws IOException { + HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieInstant lastCommit = timeline.lastInstant().get(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); + assertEquals(totalCommits, timeline.countInstants()); + assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY)); + } + } + + @Test + public void testProps() throws IOException { + TypedProperties props = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/test-source.properties")) + .getConfig(); + assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism")); + assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field")); + } + + @Test + public void testDatasetCreation() throws Exception { + try { + dfs.mkdirs(new Path(dfsBasePath + "/not_a_dataset")); + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc); + deltaStreamer.sync(); + fail("Should error out when pointed out at a dir thats not a dataset"); + } catch (DatasetNotFoundException e) { + //expected + log.error("Expected error during dataset creation", e); + } + } + + @Test + public void testBulkInsertsAndUpserts() throws Exception { + String datasetBasePath = dfsBasePath + "/test_dataset"; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT); + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1); + + // No new data => no commits. + cfg.sourceLimit = 0; + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1); + + // upsert() #1 + cfg.sourceLimit = 2000; + cfg.operation = Operation.UPSERT; + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/UtilitiesTestBase.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/UtilitiesTestBase.java new file mode 100644 index 000000000..eb2d00a65 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/UtilitiesTestBase.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities; + +import com.uber.hoodie.common.TestRawTripPayload; +import com.uber.hoodie.common.minicluster.HdfsTestService; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.sources.TestDataSource; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.util.List; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Abstract test that provides a dfs & spark contexts. + * + * TODO(vc): this needs to be done across the board. + */ +public class UtilitiesTestBase { + + protected static String dfsBasePath; + protected static HdfsTestService hdfsTestService; + protected static MiniDFSCluster dfsCluster; + protected static DistributedFileSystem dfs; + protected transient JavaSparkContext jsc = null; + protected transient SQLContext sqlContext; + + @BeforeClass + public static void initClass() throws Exception { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + } + + @AfterClass + public static void cleanupClass() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + } + } + + @Before + public void setup() throws Exception { + TestDataSource.initDataGen(); + jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); + sqlContext = new SQLContext(jsc); + } + + @After + public void teardown() throws Exception { + TestDataSource.resetDataGen(); + if (jsc != null) { + jsc.stop(); + } + } + + public static class Helpers { + + // to get hold of resources bundled with jar + private static ClassLoader classLoader = Helpers.class.getClassLoader(); + + public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException { + BufferedReader reader = new BufferedReader( + new InputStreamReader(classLoader.getResourceAsStream(testResourcePath))); + PrintStream os = new PrintStream(fs.create(new Path(targetPath), true)); + String line; + while ((line = reader.readLine()) != null) { + os.println(line); + } + os.flush(); + os.close(); + } + + public static void savePropsToDFS(TypedProperties props, FileSystem fs, String targetPath) throws IOException { + String[] lines = props.keySet().stream().map(k -> String.format("%s=%s", k, props.get(k))).toArray(String[]::new); + saveStringsToDFS(lines, fs, targetPath); + } + + public static void saveStringsToDFS(String[] lines, FileSystem fs, String targetPath) throws IOException { + PrintStream os = new PrintStream(fs.create(new Path(targetPath), true)); + for (String l : lines) { + os.println(l); + } + os.flush(); + os.close(); + } + + public static TypedProperties setupSchemaOnDFS() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + return props; + } + + public static String toJsonString(HoodieRecord hr) { + try { + return ((TestRawTripPayload) hr.getData()).getJsonData(); + } catch (IOException ioe) { + return null; + } + } + + public static String[] jsonifyRecords(List records) throws IOException { + return records.stream().map(Helpers::toJsonString).toArray(String[]::new); + } + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDFSSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDFSSource.java new file mode 100644 index 000000000..d460e6ea8 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDFSSource.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import static org.junit.Assert.assertEquals; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.utilities.UtilitiesTestBase; +import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; +import java.io.IOException; +import java.util.Optional; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Basic tests against all subclasses of {@link DFSSource} + */ +public class TestDFSSource extends UtilitiesTestBase { + + private FilebasedSchemaProvider schemaProvider; + + @BeforeClass + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + } + + @AfterClass + public static void cleanupClass() throws Exception { + UtilitiesTestBase.cleanupClass(); + } + + @Before + public void setup() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + } + + @After + public void teardown() throws Exception { + super.teardown(); + } + + @Test + public void testJsonDFSSource() throws IOException { + dfs.mkdirs(new Path(dfsBasePath + "/jsonFiles")); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/jsonFiles"); + JsonDFSSource jsonSource = new JsonDFSSource(props, jsc, schemaProvider); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Optional.empty(), jsonSource.fetchNewData(Optional.empty(), Long.MAX_VALUE).getKey()); + UtilitiesTestBase.Helpers.saveStringsToDFS( + Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs, + dfsBasePath + "/jsonFiles/1.json"); + assertEquals(Optional.empty(), jsonSource.fetchNewData(Optional.empty(), 10).getKey()); + Pair>, String> fetch1 = jsonSource.fetchNewData(Optional.empty(), 1000000); + assertEquals(100, fetch1.getKey().get().count()); + + // 2. Produce new data, extract new data + UtilitiesTestBase.Helpers.saveStringsToDFS( + Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)), + dfs, dfsBasePath + "/jsonFiles/2.json"); + Pair>, String> fetch2 = jsonSource.fetchNewData( + Optional.of(fetch1.getValue()), Long.MAX_VALUE); + assertEquals(10000, fetch2.getKey().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + Pair>, String> fetch3 = jsonSource.fetchNewData( + Optional.of(fetch1.getValue()), Long.MAX_VALUE); + assertEquals(10000, fetch3.getKey().get().count()); + assertEquals(fetch2.getValue(), fetch3.getValue()); + + // 4. Extract with latest checkpoint => no new data returned + Pair>, String> fetch4 = jsonSource.fetchNewData( + Optional.of(fetch2.getValue()), Long.MAX_VALUE); + assertEquals(Optional.empty(), fetch4.getKey()); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java new file mode 100644 index 000000000..f4c589a00 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.ImmutablePair; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * An implementation of {@link Source}, that emits test upserts. + */ +public class TestDataSource extends Source { + + private static volatile Logger log = LogManager.getLogger(TestDataSource.class); + + // Static instance, helps with reuse across a test. + private static HoodieTestDataGenerator dataGenerator; + + public static void initDataGen() { + dataGenerator = new HoodieTestDataGenerator(); + } + + public static void resetDataGen() { + dataGenerator = null; + } + + public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { + super(props, sparkContext, schemaProvider); + } + + private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { + try { + Optional recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); + return (GenericRecord) recordOpt.get(); + } catch (IOException e) { + return null; + } + } + + @Override + public Pair>, String> fetchNewData(Optional lastCheckpointStr, + long sourceLimit) { + + int nextCommitNum = lastCheckpointStr.isPresent() ? Integer.parseInt(lastCheckpointStr.get()) + 1 : 0; + String commitTime = String.format("%05d", nextCommitNum); + // No new data. + if (sourceLimit <= 0) { + return new ImmutablePair<>(Optional.empty(), commitTime); + } + + // generate `sourceLimit` number of upserts each time. + int numExistingKeys = dataGenerator.getExistingKeysList().size(); + int numUpdates = Math.min(numExistingKeys, (int) sourceLimit / 2); + int numInserts = (int) sourceLimit - numUpdates; + + List records = new ArrayList<>(); + try { + records.addAll(dataGenerator.generateUniqueUpdates(commitTime, numUpdates).stream() + .map(this::toGenericRecord).collect(Collectors.toList())); + records.addAll(dataGenerator.generateInserts(commitTime, numInserts).stream() + .map(this::toGenericRecord).collect(Collectors.toList())); + } catch (IOException e) { + log.error("Error generating test data.", e); + } + + JavaRDD avroRDD = sparkContext.parallelize(records, 4); + return new ImmutablePair<>(Optional.of(avroRDD), commitTime); + } +} diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestKafkaSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestKafkaSource.java new file mode 100644 index 000000000..785e80569 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestKafkaSource.java @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.utilities.sources; + +import static com.uber.hoodie.utilities.sources.KafkaSource.CheckpointUtils; +import static org.junit.Assert.assertEquals; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.utilities.UtilitiesTestBase; +import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; +import java.io.IOException; +import java.util.HashMap; +import java.util.Optional; +import kafka.common.TopicAndPartition; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset; +import org.apache.spark.streaming.kafka.KafkaTestUtils; +import org.apache.spark.streaming.kafka.OffsetRange; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests against {@link KafkaSource} + */ +public class TestKafkaSource extends UtilitiesTestBase { + + private static String TEST_TOPIC_NAME = "hoodie_test"; + + private FilebasedSchemaProvider schemaProvider; + private KafkaTestUtils testUtils; + + @BeforeClass + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + } + + @AfterClass + public static void cleanupClass() throws Exception { + UtilitiesTestBase.cleanupClass(); + } + + @Before + public void setup() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + testUtils = new KafkaTestUtils(); + testUtils.setup(); + } + + @After + public void teardown() throws Exception { + super.teardown(); + testUtils.teardown(); + } + + + @Test + public void testJsonKafkaSource() throws IOException { + + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); + props.setProperty("metadata.broker.list", testUtils.brokerAddress()); + props.setProperty("auto.offset.reset", "smallest"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + Source kafkaSource = new JsonKafkaSource(props, jsc, schemaProvider); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Optional.empty(), kafkaSource.fetchNewData(Optional.empty(), Long.MAX_VALUE).getKey()); + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + Pair>, String> fetch1 = kafkaSource.fetchNewData(Optional.empty(), 900); + assertEquals(900, fetch1.getKey().get().count()); + + // 2. Produce new data, extract new data + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + Pair>, String> fetch2 = kafkaSource.fetchNewData( + Optional.of(fetch1.getValue()), Long.MAX_VALUE); + assertEquals(1100, fetch2.getKey().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + Pair>, String> fetch3 = kafkaSource.fetchNewData( + Optional.of(fetch1.getValue()), Long.MAX_VALUE); + assertEquals(fetch2.getKey().get().count(), fetch3.getKey().get().count()); + assertEquals(fetch2.getValue(), fetch3.getValue()); + + // 4. Extract with latest checkpoint => no new data returned + Pair>, String> fetch4 = kafkaSource.fetchNewData( + Optional.of(fetch2.getValue()), Long.MAX_VALUE); + assertEquals(Optional.empty(), fetch4.getKey()); + } + + + private static HashMap makeOffsetMap(int[] partitions, long[] offsets) { + HashMap map = new HashMap<>(); + for (int i = 0; i < partitions.length; i++) { + map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i])); + } + return map; + } + + @Test + public void testComputeOffsetRanges() { + // test totalNewMessages() + long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{ + OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200) + }); + assertEquals(200, totalMsgs); + + // should consume all the full data + OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), + makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), + 1000000L + ); + assertEquals(200000, CheckpointUtils.totalNewMessages(ranges)); + + // should only consume upto limit + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), + makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), + 10000 + ); + assertEquals(10000, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(200000, ranges[0].fromOffset()); + assertEquals(205000, ranges[0].untilOffset()); + assertEquals(250000, ranges[1].fromOffset()); + assertEquals(255000, ranges[1].untilOffset()); + + // should also consume from new partitions. + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), + makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}), + 1000000L + ); + assertEquals(300000, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(3, ranges.length); + + // for skewed offsets, does not starve any partition & can catch up + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), + makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), + 100000 + ); + assertEquals(100000, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(10, ranges[0].count()); + assertEquals(89990, ranges[1].count()); + assertEquals(10000, ranges[2].count()); + + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), + makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), + 1000000 + ); + assertEquals(110010, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(10, ranges[0].count()); + assertEquals(100000, ranges[1].count()); + assertEquals(10000, ranges[2].count()); + } +} diff --git a/hoodie-utilities/src/main/resources/IncrementalPull.sqltemplate b/hoodie-utilities/src/test/resources/IncrementalPull.sqltemplate similarity index 100% rename from hoodie-utilities/src/main/resources/IncrementalPull.sqltemplate rename to hoodie-utilities/src/test/resources/IncrementalPull.sqltemplate diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties b/hoodie-utilities/src/test/resources/delta-streamer-config/base.properties similarity index 86% rename from hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties rename to hoodie-utilities/src/test/resources/delta-streamer-config/base.properties index 81f928b01..907581006 100644 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/hoodie-client.properties +++ b/hoodie-utilities/src/test/resources/delta-streamer-config/base.properties @@ -15,4 +15,7 @@ # # # +# Common hoodie client configs hoodie.upsert.shuffle.parallelism=2 +hoodie.insert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties b/hoodie-utilities/src/test/resources/delta-streamer-config/dfs-source.properties similarity index 60% rename from hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties rename to hoodie-utilities/src/test/resources/delta-streamer-config/dfs-source.properties index 1842069de..90c524088 100644 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/schema-provider.properties +++ b/hoodie-utilities/src/test/resources/delta-streamer-config/dfs-source.properties @@ -14,5 +14,14 @@ # limitations under the License. # # -hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc -hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///Users/vinoth/bin/hoodie/hoodie-utilities/src/main/resources/delta-streamer-config/target.avsc +# +include=base.properties +# Key generator props +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=driver +# Schema provider props (change to absolute path based on your installation) +hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/source.avsc +hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/delta-streamer-props/target.avsc +# DFS Source +hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input + diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/source.properties b/hoodie-utilities/src/test/resources/delta-streamer-config/kafka-source.properties similarity index 60% rename from hoodie-utilities/src/main/resources/delta-streamer-config/source.properties rename to hoodie-utilities/src/test/resources/delta-streamer-config/kafka-source.properties index 6e698db1d..156890ce0 100644 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/source.properties +++ b/hoodie-utilities/src/test/resources/delta-streamer-config/kafka-source.properties @@ -15,9 +15,16 @@ # # # -# DFS Source -hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input +include=base.properties +# Key fields, for kafka example +hoodie.datasource.write.recordkey.field=impressionid +hoodie.datasource.write.partitionpath.field=userid +# schema provider configs +hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest # Kafka Source -hoodie.deltastreamer.source.kafka.topic=uber_trips +#hoodie.deltastreamer.source.kafka.topic=uber_trips +hoodie.deltastreamer.source.kafka.topic=impressions +#Kafka props metadata.broker.list=localhost:9092 auto.offset.reset=smallest +schema.registry.url=http://localhost:8081 diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc b/hoodie-utilities/src/test/resources/delta-streamer-config/source.avsc similarity index 86% rename from hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc rename to hoodie-utilities/src/test/resources/delta-streamer-config/source.avsc index 11a5649d0..ef7e0273d 100644 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/source.avsc +++ b/hoodie-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -1,7 +1,11 @@ { "type" : "record", "name" : "triprec", - "fields" : [ { + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { "name" : "_row_key", "type" : "string" }, { diff --git a/pom.xml b/pom.xml index 9f3394a72..d321ddb15 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,7 @@ 2.10 2.6 2.19.1 + 2.6.5 1.8.1 4.11 1.9.5 @@ -376,6 +377,16 @@ hadoop-client ${hadoop.version}-cdh${cdh.version} provided + + + com.fasterxml.jackson.* + * + + + javax.servlet + servlet-api + + @@ -427,6 +438,12 @@ hive-common ${hive.version}-cdh${cdh.version} provided + + + com.fasterxml.jackson.* + * + + org.apache.hadoop @@ -482,12 +499,28 @@ spark-core_2.11 ${spark.version} provided + + + com.fasterxml.jackson.** + * + + + javax.servlet + servlet-api + + org.apache.spark spark-sql_2.11 ${spark.version} provided + + + com.fasterxml.jackson.** + * + + @@ -561,8 +594,33 @@ com.fasterxml.jackson.core jackson-annotations - 2.6.0 + ${fasterxml.version} + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.version} + + + + com.fasterxml.jackson.module + jackson-module-scala_2.11 + ${fasterxml.version} + + + + org.codehaus.jackson + jackson-core-asl + 1.9.13 + + org.codehaus.jackson jackson-mapper-asl @@ -573,6 +631,12 @@ org.apache.hive hive-jdbc ${hive.version}-cdh${cdh.version} + + + com.fasterxml.jackson.* + * + + @@ -596,6 +660,26 @@ hadoop-hdfs tests ${hadoop.version}-cdh${cdh.version} + + + org.codehaus + * + + + + org.mortbay.jetty + * + + + javax.servlet.jsp + * + + + javax.servlet + * + + + test org.apache.hadoop