From 64e0573aca01dfd07b23ec41e20acb307829733e Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Mon, 28 Aug 2017 01:28:08 -0700 Subject: [PATCH] Adding hoodie-spark to support Spark Datasource for Hoodie - Write with COW/MOR paths work fully - Read with RO view works on both storages* - Incremental view supported on COW - Refactored out HoodieReadClient methods, to just contain key based access - HoodieDataSourceHelpers class can be now used to construct inputs to datasource - Tests in hoodie-client using new helpers and mechanisms - Basic tests around save modes & insert/upserts (more to follow) - Bumped up scala to 2.11, since 2.10 is deprecated & complains with scalatest - Updated documentation to describe usage - New sample app written using the DataSource API --- docs/_data/sidebars/mydoc_sidebar.yml | 4 +- docs/configurations.md | 45 ++++ docs/incremental_processing.md | 70 +++-- docs/quickstart.md | 18 +- hoodie-cli/pom.xml | 6 +- .../commands/HDFSParquetImportCommand.java | 3 +- hoodie-client/pom.xml | 6 +- .../com/uber/hoodie/HoodieReadClient.java | 118 +-------- .../com/uber/hoodie/HoodieWriteClient.java | 2 +- .../uber/hoodie/config/HoodieWriteConfig.java | 5 + .../src/test/java/HoodieClientExample.java | 2 +- ...TestHoodieClientOnCopyOnWriteStorage.java} | 86 +++--- .../hoodie/common/HoodieClientTestUtils.java | 95 +++++++ .../hoodie/common/TestRawTripPayload.java | 3 +- .../{ => table}/TestMergeOnReadTable.java | 29 ++- .../uber/hoodie/common/util/ParquetUtils.java | 7 +- .../hadoop/HoodieROTablePathFilter.java | 2 +- hoodie-spark/pom.xml | 234 +++++++++++++++++ .../java/com/uber/hoodie/BaseAvroPayload.java | 8 +- .../java/com/uber/hoodie/DataSourceUtils.java | 148 +++++++++++ .../uber/hoodie/HoodieDataSourceHelpers.java | 88 +++++++ .../java/com/uber/hoodie}/KeyGenerator.java | 2 +- .../OverwriteWithLatestAvroPayload.java | 9 +- .../com/uber/hoodie}/SimpleKeyGenerator.java | 25 +- .../com/uber/hoodie/AvroConversionUtils.scala | 126 +++++++++ .../com/uber/hoodie/DataSourceOptions.scala | 145 +++++++++++ .../scala/com/uber/hoodie/DefaultSource.scala | 244 ++++++++++++++++++ .../com/uber/hoodie/IncrementalRelation.scala | 94 +++++++ .../main/scala/com/uber/hoodie/package.scala | 36 +++ .../src/test/java/DataSourceTestUtils.java | 50 ++++ hoodie-spark/src/test/java/HoodieJavaApp.java | 148 +++++++++++ .../test/resources/log4j-surefire.properties | 28 ++ .../src/test/scala/DataSourceTest.scala | 137 ++++++++++ hoodie-utilities/pom.xml | 17 +- .../uber/hoodie/utilities/UtilHelpers.java | 25 +- .../deltastreamer/HoodieDeltaStreamer.java | 16 +- .../keygen/TimestampBasedKeyGenerator.java | 8 +- .../schema/FilebasedSchemaProvider.java | 4 +- .../hoodie/utilities/sources/DFSSource.java | 4 +- .../utilities/sources/HiveIncrPullSource.java | 4 +- .../hoodie/utilities/sources/KafkaSource.java | 4 +- .../key-generator.properties | 4 +- .../src/test/java/HoodieSparkSQLExample.java | 43 --- pom.xml | 9 +- 44 files changed, 1830 insertions(+), 331 deletions(-) rename hoodie-client/src/test/java/com/uber/hoodie/{TestHoodieClient.java => TestHoodieClientOnCopyOnWriteStorage.java} (95%) rename hoodie-client/src/test/java/com/uber/hoodie/{ => table}/TestMergeOnReadTable.java (93%) create mode 100644 hoodie-spark/pom.xml rename hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java => hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java (79%) create mode 100644 hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java create mode 100644 hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java rename {hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen => hoodie-spark/src/main/java/com/uber/hoodie}/KeyGenerator.java (97%) rename hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java => hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java (82%) rename {hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen => hoodie-spark/src/main/java/com/uber/hoodie}/SimpleKeyGenerator.java (60%) create mode 100644 hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala create mode 100644 hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala create mode 100644 hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala create mode 100644 hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala create mode 100644 hoodie-spark/src/main/scala/com/uber/hoodie/package.scala create mode 100644 hoodie-spark/src/test/java/DataSourceTestUtils.java create mode 100644 hoodie-spark/src/test/java/HoodieJavaApp.java create mode 100644 hoodie-spark/src/test/resources/log4j-surefire.properties create mode 100644 hoodie-spark/src/test/scala/DataSourceTest.scala delete mode 100644 hoodie-utilities/src/test/java/HoodieSparkSQLExample.java diff --git a/docs/_data/sidebars/mydoc_sidebar.yml b/docs/_data/sidebars/mydoc_sidebar.yml index 676c57ecb..e790c982f 100644 --- a/docs/_data/sidebars/mydoc_sidebar.yml +++ b/docs/_data/sidebars/mydoc_sidebar.yml @@ -2,8 +2,8 @@ entries: - title: sidebar - product: Hoodie Ver - version: 0.2 + product: Documentation + version: folders: - title: Getting Started diff --git a/docs/configurations.md b/docs/configurations.md index 0cacdec04..88b3a5a24 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -90,4 +90,49 @@ summary: "Here we list all possible configurations and what they mean" - [GCSConfigs](gcs_hoodie.html) (Hoodie GCS Configs)
Configurations required for GCS and Hoodie co-operability. +* [Hoodie Datasource](#datasource)
+Configs for datasource + - [write options](#writeoptions) (write.format.option(...))
+ Options useful for writing datasets + - [OPERATION_OPT_KEY](#OPERATION_OPT_KEY) (Default: upsert)
+ whether to do upsert, insert or bulkinsert for the write operation + - [STORAGE_TYPE_OPT_KEY](#STORAGE_TYPE_OPT_KEY) (Default: COPY_ON_WRITE)
+ The storage type for the underlying data, for this write. + - [TABLE_NAME_OPT_KEY](#TABLE_NAME_OPT_KEY) (Default: None (mandatory))
+ Hive table name, to register the dataset into. + - [PRECOMBINE_FIELD_OPT_KEY](#PRECOMBINE_FIELD_OPT_KEY) (Default: ts)
+ Field used in preCombining before actual write. When two records have the same key value, + we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..) + - [PAYLOAD_CLASS_OPT_KEY](#PAYLOAD_CLASS_OPT_KEY) (Default: com.uber.hoodie.OverwriteWithLatestAvroPayload)
+ Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. + This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective + - [RECORDKEY_FIELD_OPT_KEY](#RECORDKEY_FIELD_OPT_KEY) (Default: uuid)
+ Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value + will be obtained by invoking .toString() on the field value. Nested fields can be specified using + the dot notation eg: `a.b.c` + - [PARTITIONPATH_FIELD_OPT_KEY](#PARTITIONPATH_FIELD_OPT_KEY) (Default: partitionpath)
+ Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. + Actual value ontained by invoking .toString() + - [KEYGENERATOR_CLASS_OPT_KEY](#KEYGENERATOR_CLASS_OPT_KEY) (Default: com.uber.hoodie.SimpleKeyGenerator)
+ Key generator class, that implements will extract the key out of incoming `Row` object + - [COMMIT_METADATA_KEYPREFIX_OPT_KEY](#COMMIT_METADATA_KEYPREFIX_OPT_KEY) (Default: _)
+ Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. + This is useful to store checkpointing information, in a consistent way with the hoodie timeline + + - [read options](#readoptions) (read.format.option(...))
+ Options useful for reading datasets + - [VIEW_TYPE_OPT_KEY](#VIEW_TYPE_OPT_KEY) (Default: = READ_OPTIMIZED)
+ Whether data needs to be read, in incremental mode (new data since an instantTime) + (or) Read Optimized mode (obtain latest view, based on columnar data) + (or) Real time mode (obtain latest view, based on row & columnar data) + - [BEGIN_INSTANTTIME_OPT_KEY](#BEGIN_INSTANTTIME_OPT_KEY) (Default: None (Mandatory in incremental mode))
+ Instant time to start incrementally pulling data from. The instanttime here need not + necessarily correspond to an instant on the timeline. New data written with an + `instant_time > BEGIN_INSTANTTIME` are fetched out. For e.g: '20170901080000' will get + all new data written after Sep 1, 2017 08:00AM. + - [END_INSTANTTIME_OPT_KEY](#END_INSTANTTIME_OPT_KEY) (Default: latest instant (i.e fetches all new data since begin instant time))
+ Instant time to limit incrementally fetched data to. New data written with an + `instant_time <= END_INSTANTTIME` are fetched out. + + {% include callout.html content="Hoodie is a young project. A lot of pluggable interfaces and configurations to support diverse workloads need to be created. Get involved [here](https://github.com/uber/hoodie)" type="info" %} diff --git a/docs/incremental_processing.md b/docs/incremental_processing.md index f81547553..753dd2044 100644 --- a/docs/incremental_processing.md +++ b/docs/incremental_processing.md @@ -19,8 +19,28 @@ discusses a few tools that can be used to achieve these on different contexts. Upserts can be used to apply a delta or an incremental change to a Hoodie dataset. For e.g, the incremental changes could be from a Kafka topic or files uploaded to HDFS or even changes pulled from another Hoodie dataset. +#### Via Spark Job -#### DeltaStreamer +The `hoodie-spark` module offers the DataSource API to write any data frame into a Hoodie dataset. Following is how we can upsert a dataframe, while specifying the field names that need to be used +for `recordKey => _row_key`, `partitionPath => partition` and `precombineKey => timestamp` + + +``` +inputDF.write() + .format("com.uber.hoodie") + .options(clientOpts) // any of the hoodie client opts can be passed in as well + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") + .option(HoodieWriteConfig.TABLE_NAME, tableName) + .mode(SaveMode.Append) + .save(basePath); +``` + +Please refer to [configurations](configurations.html) section, to view all datasource options. + + +#### DeltaStreamer Tool The `HoodieDeltaStreamer` utility provides the way to achieve all of these, by using the capabilities of `HoodieWriteClient`. @@ -151,8 +171,34 @@ This, together with upserts, are particularly useful for building data pipelines joined with other tables (datasets/dimensions), to produce deltas to a target hoodie dataset. Then, using the delta streamer tool these deltas can be upserted into the target hoodie dataset to complete the pipeline. -#### Pulling through Hive +#### Via Spark Job +The `hoodie-spark` module offers the DataSource API, offers a more elegant way to pull data from Hoodie dataset (plus more) and process it via Spark. +This class can be used within existing Spark jobs and offers the following functionality. +A sample incremental pull, that will obtain all records written since `beginInstantTime`, looks like below. + +``` + Dataset hoodieIncViewDF = spark.read() + .format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), + DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), + ) + .load(tablePath); // For incremental view, pass in the root/base path of dataset +``` + +Please refer to [configurations](configurations.html) section, to view all datasource options. + + +Additionally, `HoodieReadClient` offers the following functionality using Hoodie's implicit indexing. + +| **API** | **Description** | +| read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hoodie's own index for faster lookup | +| filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication | +| checkExists(keys) | Check if the provided keys exist in a Hoodie dataset | + + +#### HiveIncrementalPuller Tool `HiveIncrementalPuller` allows the above to be done via HiveQL, combining the benefits of Hive (reliably process complex SQL queries) and incremental primitives (speed up query by pulling tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the Hive query saving its results in a temp table. that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. @@ -183,24 +229,4 @@ it will automatically use the backfill configuration, since applying the last 24 is the lack of support for self-joining the same table in mixed mode (normal and incremental modes). -#### Pulling through Spark - -`HoodieReadClient` (inside hoodie-client) offers a more elegant way to pull data from Hoodie dataset (plus more) and process it via Spark. -This class can be used within existing Spark jobs and offers the following functionality. - -| **API** | **Description** | -| listCommitsSince(),latestCommit() | Obtain commit times to pull data from | -| readSince(commitTime),readCommit(commitTime) | Provide the data from the commit time as a DataFrame, to process further on | -| read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hoodie's own index for faster lookup | -| read(paths) | Read out the data under specified path, with the functionality of HoodieInputFormat. An alternative way to do SparkSQL on Hoodie datasets | -| filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication | -| checkExists(keys) | Check if the provided keys exist in a Hoodie dataset | - - -## SQL Streamer - -work in progress, tool being refactored out into open source Hoodie - - -{% include callout.html content="Get involved in building this tool [here](https://github.com/uber/hoodie/issues/20)" type="info" %} diff --git a/docs/quickstart.md b/docs/quickstart.md index b04accdb2..f463c3075 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -3,6 +3,7 @@ title: Quickstart keywords: quickstart tags: [quickstart] sidebar: mydoc_sidebar +toc: false permalink: quickstart.html --- @@ -16,7 +17,7 @@ Normally build the maven project, from command line $ mvn clean install -DskipTests ``` -{% include callout.html content="You might want to add your spark assembly jar to project dependencies under 'Module Setttings', to be able to run Spark from IDE" type="info" %} +{% include callout.html content="You might want to add your spark jars folder to project dependencies under 'Module Setttings', to be able to run Spark from IDE" type="info" %} {% include note.html content="Setup your local hadoop/hive test environment, so you can play with entire ecosystem. See [this](http://www.bytearray.io/2016/05/setting-up-hadoopyarnsparkhive-on-mac.html) for reference" %} @@ -25,13 +26,15 @@ $ mvn clean install -DskipTests ## Generate a Hoodie Dataset -You can run the __hoodie-client/src/test/java/HoodieClientExample.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem +### DataSource API + +Run __hoodie-spark/src/test/java/HoodieJavaApp.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem + ``` Usage:
[options] Options: --help, -h - Default: false --table-name, -n table name for Hoodie sample table @@ -46,7 +49,14 @@ Usage:
[options] ``` -The class lets you choose table names, output paths and one of the storage types. +The class lets you choose table names, output paths and one of the storage types. In your own applications, be sure to include the `hoodie-spark` module as dependency +and follow a similar pattern to write/read datasets via the datasource. + +### RDD API + +RDD level APIs give you more power and control over things, via the `hoodie-client` module . +Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example. + ## Register Dataset to Hive Metastore diff --git a/hoodie-cli/pom.xml b/hoodie-cli/pom.xml index 8ce890aad..c83e04488 100644 --- a/hoodie-cli/pom.xml +++ b/hoodie-cli/pom.xml @@ -131,7 +131,7 @@ org.scala-lang scala-library - 2.10.5 + ${scala.version} @@ -146,11 +146,11 @@ org.apache.spark - spark-core_2.10 + spark-core_2.11 org.apache.spark - spark-sql_2.10 + spark-sql_2.11 diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java index 4053d4e72..ca6e1ab05 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/HDFSParquetImportCommand.java @@ -67,9 +67,8 @@ public class HDFSParquetImportCommand implements CommandMarker { boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); - String sparkPropertiesPath = Utils - .getDefaultPropertiesFile(scala.collection.JavaConversions.asScalaMap(System.getenv())); + .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.IMPORT.toString(), srcPath, targetPath, tableName, diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml index 9b2a28318..5c55d888d 100644 --- a/hoodie-client/pom.xml +++ b/hoodie-client/pom.xml @@ -158,12 +158,12 @@ org.apache.spark - spark-core_2.10 + spark-core_2.11 org.apache.spark - spark-sql_2.10 + spark-sql_2.11 @@ -180,7 +180,7 @@ com.uber.hoodie hoodie-hadoop-mr - 0.4.0-SNAPSHOT + ${project.version} test diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index b57bd3285..0417aeaff 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -59,10 +59,8 @@ import java.util.stream.Collectors; import scala.Tuple2; /** - * Provides first class support for accessing Hoodie tables for data processing via Apache Spark. + * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. * - * - * TODO: Need to move all read operations here, since Hoodie is a single writer and multiple reader */ public class HoodieReadClient implements Serializable { @@ -81,7 +79,6 @@ public class HoodieReadClient implements Serializable { private HoodieTable hoodieTable; private transient Optional sqlContextOpt; - /** * @param basePath path to Hoodie dataset */ @@ -92,7 +89,6 @@ public class HoodieReadClient implements Serializable { this.hoodieTable = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); this.commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); - this.index = new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); this.sqlContextOpt = Optional.absent(); @@ -161,93 +157,6 @@ public class HoodieReadClient implements Serializable { return sqlContextOpt.get().createDataFrame(rowRDD, schema); } - /** - * Reads the paths under the a hoodie dataset out as a DataFrame - */ - public Dataset read(String... paths) { - assertSqlContext(); - List filteredPaths = new ArrayList<>(); - - try { - for (String path : paths) { - if (!path.contains(hoodieTable.getMetaClient().getBasePath())) { - throw new HoodieException("Path " + path - + " does not seem to be a part of a Hoodie dataset at base path " - + hoodieTable.getMetaClient().getBasePath()); - } - - TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), - hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path))); - List latestFiles = fileSystemView.getLatestDataFiles().collect( - Collectors.toList()); - for (HoodieDataFile file : latestFiles) { - filteredPaths.add(file.getPath()); - } - } - return sqlContextOpt.get().read() - .parquet(filteredPaths.toArray(new String[filteredPaths.size()])); - } catch (Exception e) { - throw new HoodieException("Error reading hoodie dataset as a dataframe", e); - } - } - - /** - * Obtain all new data written into the Hoodie dataset since the given timestamp. - * - * If you made a prior call to {@link HoodieReadClient#latestCommit()}, it gives you all data in - * the time window (commitTimestamp, latestCommit) - */ - public Dataset readSince(String lastCommitTimestamp) { - - List commitsToReturn = - commitTimeline.findInstantsAfter(lastCommitTimestamp, Integer.MAX_VALUE) - .getInstants().collect(Collectors.toList()); - //TODO: we can potentially trim this down to only affected partitions, using CommitMetadata - try { - - // Go over the commit metadata, and obtain the new files that need to be read. - HashMap fileIdToFullPath = new HashMap<>(); - for (HoodieInstant commit : commitsToReturn) { - HoodieCommitMetadata metadata = - HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); - // get files from each commit, and replace any previous versions - String basePath = hoodieTable.getMetaClient().getBasePath(); - fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); - } - return sqlContextOpt.get().read() - .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()])) - .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTimestamp)); - } catch (IOException e) { - throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTimestamp, e); - } - } - - /** - * Obtain - */ - public Dataset readCommit(String commitTime) { - assertSqlContext(); - String actionType = hoodieTable.getCompactedCommitActionType(); - HoodieInstant commitInstant = - new HoodieInstant(false, actionType, commitTime); - if (!commitTimeline.containsInstant(commitInstant)) { - new HoodieException("No commit exists at " + commitTime); - } - - try { - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commitInstant).get()); - String basePath = hoodieTable.getMetaClient().getBasePath(); - HashMap paths = commitMetadata.getFileIdAndFullPaths(basePath); - return sqlContextOpt.get().read() - .parquet(paths.values().toArray(new String[paths.size()])) - .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); - } catch (Exception e) { - throw new HoodieException("Error reading commit " + commitTime, e); - } - } - - /** * Checks if the given [Keys] exists in the hoodie table and returns [Key, * Optional[FullFilePath]] If the optional FullFilePath value is not present, then the key is @@ -269,29 +178,4 @@ public class HoodieReadClient implements Serializable { JavaRDD recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable); return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } - - /** - * Checks if the Hoodie dataset has new data since given timestamp. This can be subsequently - * used to call {@link HoodieReadClient#readSince(String)} to perform incremental processing. - */ - public boolean hasNewCommits(String commitTimestamp) { - return listCommitsSince(commitTimestamp).size() > 0; - } - - /** - * - * @param commitTimestamp - * @return - */ - public List listCommitsSince(String commitTimestamp) { - return commitTimeline.findInstantsAfter(commitTimestamp, Integer.MAX_VALUE).getInstants() - .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - } - - /** - * Returns the last successful commit (a successful write operation) into a Hoodie table. - */ - public String latestCommit() { - return commitTimeline.lastInstant().get().getTimestamp(); - } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index ae2cf88fe..67bebcd6c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -368,7 +368,7 @@ public class HoodieWriteClient implements Seriali JavaRDD writeStatuses, Optional> extraMetadata) { - logger.info("Comitting " + commitTime); + logger.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index f3081e21f..b954a7bbf 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Properties; /** @@ -298,6 +299,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } } + public Builder withProps(Map kvprops) { + props.putAll(kvprops); + return this; + } public Builder withPath(String basePath) { props.setProperty(BASE_PATH_PROP, basePath); diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index 1045418c3..cffe5605d 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -61,7 +61,7 @@ public class HoodieClientExample { HoodieClientExample cli = new HoodieClientExample(); JCommander cmd = new JCommander(cli, args); - if (cli.help || args.length == 0) { + if (cli.help) { cmd.usage(); System.exit(1); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java similarity index 95% rename from hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java rename to hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 45ebbd2d0..2995ddf18 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -17,6 +17,7 @@ package com.uber.hoodie; import com.google.common.collect.Iterables; + import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; @@ -33,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; @@ -43,11 +45,11 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; + import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.scheduler.SparkListener; @@ -58,7 +60,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.collection.Iterator; import java.io.File; import java.io.FileInputStream; @@ -76,12 +77,14 @@ import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import scala.collection.Iterator; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class TestHoodieClient implements Serializable { +public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { private transient JavaSparkContext jsc = null; private transient SQLContext sqlContext; private String basePath = null; @@ -142,6 +145,8 @@ public class TestHoodieClient implements Serializable { } } + + @Test public void testFilterExist() throws Exception { HoodieWriteConfig config = getConfig(); @@ -219,12 +224,15 @@ public class TestHoodieClient implements Serializable { assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); // verify that there is a commit - HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext); - assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1); - assertEquals("Latest commit should be 001",readClient.latestCommit(), newCommitTime); - assertEquals("Must contain 200 records", readClient.readCommit(newCommitTime).count(), records.size()); - // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + + assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); + assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain 200 records", + records.size(), + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); @@ -248,9 +256,9 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); // verify there are now 2 commits - readClient = new HoodieReadClient(jsc, basePath, sqlContext); - assertEquals("Expecting two commits.", readClient.listCommitsSince("000").size(), 2); - assertEquals("Latest commit should be 004",readClient.latestCommit(), newCommitTime); + timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting two commits.", timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), 2); + assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime); metaClient = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metaClient, getConfig()); @@ -264,16 +272,18 @@ public class TestHoodieClient implements Serializable { for (int i=0; i < fullPartitionPaths.length; i++) { fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); } - assertEquals("Must contain 200 records", readClient.read(fullPartitionPaths).count(), 200); + assertEquals("Must contain 200 records", + 200, + HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count()); // Check that the incremental consumption from time 000 assertEquals("Incremental consumption from time 002, should give all records in commit 004", - readClient.readCommit(newCommitTime).count(), - readClient.readSince("002").count()); + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "002").count()); assertEquals("Incremental consumption from time 001, should give all records in commit 004", - readClient.readCommit(newCommitTime).count(), - readClient.readSince("001").count()); + HoodieClientTestUtils.readCommit(basePath, sqlContext,timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext,timeline, "001").count()); } @Test @@ -301,12 +311,13 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); // verify that there is a commit - HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext); - assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1); - assertEquals("Latest commit should be 001",readClient.latestCommit(), newCommitTime); - assertEquals("Must contain 200 records", readClient.readCommit(newCommitTime).count(), fewRecordsForInsert.size()); - // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); + assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain 200 records", fewRecordsForInsert.size(), + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); List taggedRecords = index.tagLocation(jsc.parallelize(fewRecordsForInsert, 1), table).collect(); @@ -327,31 +338,29 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); // verify there are now 2 commits - readClient = new HoodieReadClient(jsc, basePath, sqlContext); - assertEquals("Expecting two commits.", readClient.listCommitsSince("000").size(), 2); - assertEquals("Latest commit should be 004",readClient.latestCommit(), newCommitTime); - - metaClient = new HoodieTableMetaClient(fs, basePath); - table = HoodieTable.getHoodieTable(metaClient, getConfig()); + timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting two commits.", timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), 2); + assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime); // Check the entire dataset has 150 records(200-50) still String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; for (int i=0; i < fullPartitionPaths.length; i++) { fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); } - assertEquals("Must contain 150 records", readClient.read(fullPartitionPaths).count(), 150); + assertEquals("Must contain 150 records", 150, + HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count()); // Check that the incremental consumption from time 000 assertEquals("Incremental consumption from latest commit, should give 50 updated records", - readClient.readCommit(newCommitTime).count(), - 50); + 50, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); assertEquals("Incremental consumption from time 001, should give 50 updated records", 50, - readClient.readSince("001").count()); + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count()); assertEquals("Incremental consumption from time 000, should give 150", 150, - readClient.readSince("000").count()); + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count()); } @@ -566,10 +575,10 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); // verify that there is a commit - - assertEquals("Expecting a single commit.", new HoodieReadClient(jsc, basePath).listCommitsSince("000").size(), 1); - // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); + // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); assertFalse(table.getCompletedCommitTimeline().empty()); String commitTime = @@ -594,7 +603,7 @@ public class TestHoodieClient implements Serializable { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metadata, getConfig()); - HoodieTimeline timeline = table.getCommitTimeline(); + timeline = table.getCommitTimeline(); TableFileSystemView fsView = table.getFileSystemView(); // Need to ensure the following @@ -660,9 +669,10 @@ public class TestHoodieClient implements Serializable { assertNoWriteErrors(statuses); // verify that there is a commit - assertEquals("Expecting a single commit.", new HoodieReadClient(jsc, basePath).listCommitsSince("000").size(), 1); - // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); + // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); assertFalse(table.getCompletedCommitTimeline().empty()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index 1aac5785f..e0148f93b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -18,10 +18,23 @@ package com.uber.hoodie.common; import com.uber.hoodie.HoodieReadClient; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.SparkConf; @@ -29,10 +42,13 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Utility methods to aid testing inside the HoodieClient module. @@ -90,4 +106,83 @@ public class HoodieClientTestUtils { .setMaster("local[1]"); return HoodieReadClient.addHoodieSupport(sparkConf); } + + public static HashMap getLatestFileIsToFullPath(String basePath, + HoodieTimeline commitTimeline, + List commitsToReturn) throws IOException { + HashMap fileIdToFullPath = new HashMap<>(); + for (HoodieInstant commit : commitsToReturn) { + HoodieCommitMetadata metadata = + HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get()); + fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); + } + return fileIdToFullPath; + } + + public static Dataset readCommit(String basePath, + SQLContext sqlContext, + HoodieTimeline commitTimeline, + String commitTime) { + HoodieInstant commitInstant = + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); + if (!commitTimeline.containsInstant(commitInstant)) { + new HoodieException("No commit exists at " + commitTime); + } + try { + HashMap paths = getLatestFileIsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); + return sqlContext.read() + .parquet(paths.values().toArray(new String[paths.size()])) + .filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)); + } catch (Exception e) { + throw new HoodieException("Error reading commit " + commitTime, e); + } + } + + /** + * Obtain all new data written into the Hoodie dataset since the given timestamp. + */ + public static Dataset readSince(String basePath, + SQLContext sqlContext, + HoodieTimeline commitTimeline, + String lastCommitTime) { + List commitsToReturn = + commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE) + .getInstants().collect(Collectors.toList()); + try { + // Go over the commit metadata, and obtain the new files that need to be read. + HashMap fileIdToFullPath = getLatestFileIsToFullPath(basePath, commitTimeline, commitsToReturn); + return sqlContext.read() + .parquet(fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()])) + .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)); + } catch (IOException e) { + throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e); + } + } + + /** + * Reads the paths under the a hoodie dataset out as a DataFrame + */ + public static Dataset read(String basePath, + SQLContext sqlContext, + FileSystem fs, + String... paths) { + List filteredPaths = new ArrayList<>(); + try { + HoodieTable hoodieTable = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); + for (String path : paths) { + TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), + hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path))); + List latestFiles = fileSystemView.getLatestDataFiles().collect( + Collectors.toList()); + for (HoodieDataFile file : latestFiles) { + filteredPaths.add(file.getPath()); + } + } + return sqlContext.read() + .parquet(filteredPaths.toArray(new String[filteredPaths.size()])); + } catch (Exception e) { + throw new HoodieException("Error reading hoodie dataset as a dataframe", e); + } + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java index 95a47d904..572792495 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java @@ -109,11 +109,10 @@ public class TestRawTripPayload implements HoodieRecordPayload footerVals = new ArrayList<>(); ParquetMetadata footer = readMetadata(parquetFilePath); Map metadata = footer.getFileMetaData().getKeyValueMetaData(); - for (String footerName: footerNames) { + for (String footerName : footerNames) { if (metadata.containsKey(footerName)) { footerVals.add(metadata.get(footerName)); } else { @@ -137,6 +138,10 @@ public class ParquetUtils { return footerVals; } + public static Schema readAvroSchema(Path parquetFilePath) { + return new AvroSchemaConverter().convert(readSchema(parquetFilePath)); + } + /** * Read out the bloom filter from the parquet file meta data. */ diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java index bffb015a2..769bc4d52 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -50,7 +50,7 @@ import java.util.stream.Collectors; */ public class HoodieROTablePathFilter implements PathFilter, Serializable { - public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class); + public static final Log LOG = LogFactory.getLog(HoodieROTablePathFilter.class); /** * Its quite common, to have all files from a given partition path be passed into accept(), diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml new file mode 100644 index 000000000..ce26e98f8 --- /dev/null +++ b/hoodie-spark/pom.xml @@ -0,0 +1,234 @@ + + + + + + hoodie + com.uber.hoodie + 0.4.0-SNAPSHOT + + 4.0.0 + + com.uber.hoodie + hoodie-spark + jar + + + 1.2.17 + 4.10 + + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.3.1 + + + org.apache.maven.plugins + maven-compiler-plugin + 2.0.2 + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.rat + apache-rat-plugin + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + org.scalatest + scalatest_2.11 + 3.0.1 + test + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + com.databricks + spark-avro_2.11 + 3.2.0 + + + com.fasterxml.jackson.core + jackson-annotations + + + org.codehaus.jackson + jackson-mapper-asl + + + + org.apache.hadoop + hadoop-client + + + javax.servlet + * + + + provided + + + + org.apache.hadoop + hadoop-common + provided + + + + log4j + log4j + ${log4j.version} + + + org.apache.avro + avro + + + + org.apache.commons + commons-lang3 + + + + org.apache.commons + commons-configuration2 + + + + com.uber.hoodie + hoodie-client + ${project.version} + + + com.uber.hoodie + hoodie-common + ${project.version} + + + com.uber.hoodie + hoodie-hadoop-mr + ${project.version} + + + junit + junit-dep + ${junit.version} + test + + + + com.uber.hoodie + hoodie-client + ${project.version} + test-jar + test + + + com.uber.hoodie + hoodie-common + ${project.version} + test-jar + test + + + + diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java similarity index 79% rename from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java rename to hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java index fecfb1d40..45022f28a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/BaseAvroPayload.java @@ -16,15 +16,15 @@ * */ -package com.uber.hoodie.utilities.deltastreamer; +package com.uber.hoodie; import org.apache.avro.generic.GenericRecord; import java.io.Serializable; /** - * Base class for all payload supported for the {@link HoodieDeltaStreamer} + * Base class for all AVRO record based payloads, that can be ordered based on a field */ -public abstract class DeltaStreamerPayload implements Serializable { +public abstract class BaseAvroPayload implements Serializable { /** @@ -42,7 +42,7 @@ public abstract class DeltaStreamerPayload implements Serializable { * @param record * @param orderingVal */ - public DeltaStreamerPayload(GenericRecord record, Comparable orderingVal) { + public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { this.record = record; this.orderingVal = orderingVal; } diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java new file mode 100644 index 000000000..b78da7955 --- /dev/null +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieNotSupportedException; +import com.uber.hoodie.index.HoodieIndex; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Utilities used throughout the data source + */ +public class DataSourceUtils { + + /** + * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c + */ + public static String getNestedFieldValAsString(GenericRecord record, String fieldName) { + String[] parts = fieldName.split("\\."); + GenericRecord valueNode = record; + for (int i = 0; i < parts.length; i++) { + String part = parts[i]; + Object val = valueNode.get(part); + if (val == null) { + break; + } + + // return, if last part of name + if (i == parts.length - 1) { + return val.toString(); + } else { + if (val instanceof GenericRecord) { + throw new HoodieException("Cannot find a record at part value :" + part); + } + valueNode = (GenericRecord) val; + } + } + throw new HoodieException(fieldName + " field not found in record"); + } + + /** + * Create a key generator class via reflection, passing in any configs needed + */ + public static KeyGenerator createKeyGenerator(String keyGeneratorClass, PropertiesConfiguration cfg) throws IOException { + try { + return (KeyGenerator) ConstructorUtils.invokeConstructor(Class.forName(keyGeneratorClass), (Object) cfg); + } catch (Throwable e) { + throw new IOException("Could not load key generator class " + keyGeneratorClass, e); + } + } + + /** + * Create a payload class via reflection, passing in an ordering/precombine value value. + */ + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { + try { + return (HoodieRecordPayload) ConstructorUtils.invokeConstructor(Class.forName(payloadClass), (Object) record, (Object) orderingVal); + } catch (Throwable e) { + throw new IOException("Could not create payload for class: " + payloadClass, e); + } + } + + public static void checkRequiredProperties(PropertiesConfiguration configuration, List checkPropNames) { + checkPropNames.stream().forEach(prop -> { + if (!configuration.containsKey(prop)) { + throw new HoodieNotSupportedException("Required property " + prop + " is missing"); + } + }); + } + + public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, + String schemaStr, + String basePath, + String tblName, + Map parameters) throws Exception { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .combineInput(true, true) + .withPath(basePath) + .withAutoCommit(false) + .withSchema(schemaStr) + .forTable(tblName) + .withIndexConfig( + HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.BLOOM) + .build()) + // override above with Hoodie configs specified as options. + .withProps(parameters) + .build(); + + return new HoodieWriteClient<>(jssc, writeConfig); + } + + + public static JavaRDD doWriteOperation(HoodieWriteClient client, + JavaRDD hoodieRecords, + String commitTime, + String operation) { + if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { + return client.bulkInsert(hoodieRecords, commitTime); + } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { + return client.insert(hoodieRecords, commitTime); + } else { + //default is upsert + return client.upsert(hoodieRecords, commitTime); + } + } + + public static HoodieRecord createHoodieRecord(GenericRecord gr, + Comparable orderingVal, + HoodieKey hKey, + String payloadClass) throws IOException { + HoodieRecordPayload payload = DataSourceUtils.createPayload( + payloadClass, + gr, + orderingVal); + return new HoodieRecord<>(hKey, payload); + } +} diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java new file mode 100644 index 000000000..2d5a6907d --- /dev/null +++ b/hoodie-spark/src/main/java/com/uber/hoodie/HoodieDataSourceHelpers.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie; + +import com.google.common.collect.Sets; + +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * List of helpers to aid, construction of instanttime for read and write operations using datasource + */ +public class HoodieDataSourceHelpers { + + /** + * Checks if the Hoodie dataset has new data since given timestamp. This can be subsequently + * fed to an incremental view read, to perform incremental processing. + */ + public static boolean hasNewCommits(FileSystem fs, String basePath, String commitTimestamp) { + return listCommitsSince(fs, basePath, commitTimestamp).size() > 0; + } + + /** + * Get a list of instant times that have occurred, from the given instant timestamp. + * + * @param instantTimestamp + * @return + */ + public static List listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) { + HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath); + return timeline.findInstantsAfter(instantTimestamp, Integer.MAX_VALUE).getInstants() + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } + + /** + * Returns the last successful write operation's instant time + */ + public static String latestCommit(FileSystem fs, String basePath) { + HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath); + return timeline.lastInstant().get().getTimestamp(); + } + + /** + * Obtain all the commits, compactions that have occurred on the timeline, whose + * instant times could be fed into the datasource options. + * + * @param fs + * @param basePath + * @return + */ + public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) { + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); + if (table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ)) { + return table.getActiveTimeline().getTimelineOfActions( + Sets.newHashSet(HoodieActiveTimeline.COMPACTION_ACTION, + HoodieActiveTimeline.DELTA_COMMIT_ACTION) + ); + } else { + return table.getCompletedCompactionCommitTimeline(); + } + } +} diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java similarity index 97% rename from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java rename to hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java index e21251c8b..58bd3e909 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/KeyGenerator.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/KeyGenerator.java @@ -16,7 +16,7 @@ * */ -package com.uber.hoodie.utilities.keygen; +package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java similarity index 82% rename from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java rename to hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java index 70c506cfb..e7877aafc 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaStreamerAvroPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java @@ -16,7 +16,7 @@ * */ -package com.uber.hoodie.utilities.deltastreamer; +package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.HoodieAvroUtils; @@ -34,19 +34,20 @@ import java.util.Optional; * 1. preCombine - Picks the latest delta record for a key, based on an ordering field * 2. combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record */ -public class DeltaStreamerAvroPayload extends DeltaStreamerPayload implements HoodieRecordPayload { +public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { /** * * @param record * @param orderingVal */ - public DeltaStreamerAvroPayload(GenericRecord record, Comparable orderingVal) { + public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @Override - public DeltaStreamerAvroPayload preCombine(DeltaStreamerAvroPayload another) { + public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) { + // pick the payload with greatest ordering value if (another.orderingVal.compareTo(orderingVal) > 0) { return another; } else { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java similarity index 60% rename from hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java rename to hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java index 30d0bcedb..c5733856a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/SimpleKeyGenerator.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/SimpleKeyGenerator.java @@ -16,16 +16,14 @@ * */ -package com.uber.hoodie.utilities.keygen; +package com.uber.hoodie; import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.exception.HoodieException; import org.apache.avro.generic.GenericRecord; import org.apache.commons.configuration.PropertiesConfiguration; -import java.util.Arrays; - /** * Simple key generator, which takes names of fields to be used for recordKey and partitionPath * as configs. @@ -36,23 +34,18 @@ public class SimpleKeyGenerator extends KeyGenerator { protected final String partitionPathField; - /** - * Supported configs - */ - static class Config { - private static final String RECORD_KEY_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.recordkey.field"; - private static final String PARTITION_PATH_FIELD_PROP = "hoodie.deltastreamer.keygen.simple.partitionpath.field"; - } - public SimpleKeyGenerator(PropertiesConfiguration config) { super(config); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.PARTITION_PATH_FIELD_PROP, Config.RECORD_KEY_FIELD_PROP)); - this.recordKeyField = config.getString(Config.RECORD_KEY_FIELD_PROP); - this.partitionPathField = config.getString(Config.PARTITION_PATH_FIELD_PROP); + this.recordKeyField = config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); + this.partitionPathField = config.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()); } @Override public HoodieKey getKey(GenericRecord record) { - return new HoodieKey(record.get(recordKeyField).toString(), record.get(partitionPathField).toString()); + if (recordKeyField == null || partitionPathField == null) { + throw new HoodieException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField), + DataSourceUtils.getNestedFieldValAsString(record, partitionPathField)); } } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala new file mode 100644 index 000000000..82cf7cc14 --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.uber.hoodie + +import java.nio.ByteBuffer +import java.sql.{Date, Timestamp} +import java.util + +import com.databricks.spark.avro.SchemaConverters +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.generic.GenericRecord +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} + + +object AvroConversionUtils { + + def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { + val dataType = df.schema + df.rdd.mapPartitions { records => + if (records.isEmpty) Iterator.empty + else { + val convertor = createConverterToAvro(dataType, structName, recordNamespace) + records.map { x => convertor(x).asInstanceOf[GenericRecord] } + } + } + } + + def createConverterToAvro(dataType: DataType, + structName: String, + recordNamespace: String): (Any) => Any = { + dataType match { + case BinaryType => (item: Any) => item match { + case null => null + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + } + case ByteType | ShortType | IntegerType | LongType | + FloatType | DoubleType | StringType | BooleanType => identity + case _: DecimalType => (item: Any) => if (item == null) null else item.toString + case TimestampType => (item: Any) => + if (item == null) null else item.asInstanceOf[Timestamp].getTime + case DateType => (item: Any) => + if (item == null) null else item.asInstanceOf[Date].getTime + case ArrayType(elementType, _) => + val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val sourceArray = item.asInstanceOf[Seq[Any]] + val sourceArraySize = sourceArray.size + val targetList = new util.ArrayList[Any](sourceArraySize) + var idx = 0 + while (idx < sourceArraySize) { + targetList.add(elementConverter(sourceArray(idx))) + idx += 1 + } + targetList + } + } + case MapType(StringType, valueType, _) => + val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val javaMap = new util.HashMap[String, Any]() + item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => + javaMap.put(key, valueConverter(value)) + } + javaMap + } + } + case structType: StructType => + val builder = SchemaBuilder.record(structName).namespace(recordNamespace) + val schema: Schema = SchemaConverters.convertStructToAvro( + structType, builder, recordNamespace) + val fieldConverters = structType.fields.map(field => + createConverterToAvro(field.dataType, field.name, recordNamespace)) + (item: Any) => { + if (item == null) { + null + } else { + val record = new Record(schema) + val convertersIterator = fieldConverters.iterator + val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator + val rowIterator = item.asInstanceOf[Row].toSeq.iterator + + while (convertersIterator.hasNext) { + val converter = convertersIterator.next() + record.put(fieldNamesIterator.next(), converter(rowIterator.next())) + } + record + } + } + } + } + + def convertStructTypeToAvroSchema(structType: StructType, + structName: String, + recordNamespace: String) : Schema = { + val builder = SchemaBuilder.record(structName).namespace(recordNamespace) + SchemaConverters.convertStructToAvro(structType, builder, recordNamespace) + } + + def convertAvroSchemaToStructType(avroSchema: Schema) : StructType = { + SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]; + } +} diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala new file mode 100644 index 000000000..cb799c99b --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie + +import com.uber.hoodie.common.model.HoodieTableType + +/** + * List of options that can be passed to the Hoodie datasource, + * in addition to the hoodie client configs + */ + +/** + * Options supported for reading hoodie datasets. + */ +object DataSourceReadOptions { + /** + * Whether data needs to be read, in + * incremental mode (new data since an instantTime) + * (or) Read Optimized mode (obtain latest view, based on columnar data) + * (or) Real time mode (obtain latest view, based on row & columnar data) + * + * Default: READ_OPTIMIZED + */ + val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" + val VIEW_TYPE_READ_OPTIMIZED_OPT_VAL = "READ_OPTIMIZED" + val VIEW_TYPE_INCREMENTAL_OPT_VAL = "INCREMENTAL" + val VIEW_TYPE_REALTIME_OPT_VAL = "REALTIME" + val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL + + + /** + * Instant time to start incrementally pulling data from. The instanttime here need not + * necessarily correspond to an instant on the timeline. New data written with an + * `instant_time > BEGIN_INSTANTTIME` are fetched out. For e.g: '20170901080000' will get + * all new data written after Sep 1, 2017 08:00AM. + * + * Default: None (Mandatory in incremental mode) + */ + val BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime" + + + /** + * Instant time to limit incrementally fetched data to. New data written with an + * `instant_time <= END_INSTANTTIME` are fetched out. + * + * Default: latest instant (i.e fetches all new data since begin instant time) + * + */ + val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime" +} + +/** + * Options supported for writing hoodie datasets. + */ +object DataSourceWriteOptions { + /** + * The client operation, that this write should do + * + * Default: upsert() + */ + val OPERATION_OPT_KEY = "hoodie.datasource.write.operation" + val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" + val INSERT_OPERATION_OPT_VAL = "insert" + val UPSERT_OPERATION_OPT_VAL = "upsert" + val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL; + + /** + * The storage type for the underlying data, for this write. + * + * Default: COPY_ON_WRITE + */ + val STORAGE_TYPE_OPT_KEY = "hoodie.datasource.write.storage.type" + val COW_STORAGE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name + val MOR_STORAGE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name + val DEFAULT_STORAGE_TYPE_OPT_VAL = COW_STORAGE_TYPE_OPT_VAL + + /** + * Hive table name, to register the dataset into. + * + * Default: None (mandatory) + */ + val TABLE_NAME_OPT_KEY = "hoodie.datasource.write.table.name" + + /** + * Field used in preCombining before actual write. When two records have the same + * key value, we will pick the one with the largest value for the precombine field, + * determined by Object.compareTo(..) + */ + val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field" + val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts" + + + /** + * Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. + * This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective + */ + val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class" + val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName + + /** + * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value + * will be obtained by invoking .toString() on the field value. Nested fields can be specified using + * the dot notation eg: `a.b.c` + * + */ + val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field" + val DEFAULT_RECORDKEY_FIELD_OPT_VAL = "uuid" + + /** + * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual + * value ontained by invoking .toString() + */ + val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field" + val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath" + + /** + * Key generator class, that implements will extract the key out of incoming record + * + */ + val KEYGENERATOR_CLASS_OPT_KEY = "hoodie.datasource.write.keygenerator.class" + val DEFAULT_KEYGENERATOR_CLASS_OPT_VAL = classOf[SimpleKeyGenerator].getName + + /** + * Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. + * This is useful to store checkpointing information, in a consistent way with the hoodie timeline + */ + val COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix" + val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_" +} diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala new file mode 100644 index 000000000..26579630d --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie + +import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.{Optional, Properties} + +import com.uber.hoodie.DataSourceReadOptions._ +import com.uber.hoodie.DataSourceWriteOptions._ +import com.uber.hoodie.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import com.uber.hoodie.config.HoodieWriteConfig +import com.uber.hoodie.exception.HoodieException +import org.apache.avro.generic.GenericRecord +import org.apache.commons.configuration.PropertiesConfiguration +import org.apache.hadoop.fs.Path +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} + +import scala.collection.JavaConversions._ + +/** + * Hoodie Spark Datasource, for reading and writing hoodie datasets + * + */ +class DefaultSource extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider + with DataSourceRegister + with Serializable { + + private val log = LogManager.getLogger(classOf[DefaultSource]) + + override def createRelation(sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + createRelation(sqlContext, parameters, null) + } + + /** + * Add default options for unspecified read options keys. + * + * @param parameters + * @return + */ + def parametersWithReadDefaults(parameters: Map[String, String]) = { + val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) + defaultsMap.putIfAbsent(VIEW_TYPE_OPT_KEY, DEFAULT_VIEW_TYPE_OPT_VAL) + mapAsScalaMap(defaultsMap) + } + + override def createRelation(sqlContext: SQLContext, + optParams: Map[String, String], + schema: StructType): BaseRelation = { + val parameters = parametersWithReadDefaults(optParams) + val path = parameters.get("path") + if (path.isEmpty) { + throw new HoodieException("'path' must be specified.") + } + + if (parameters(VIEW_TYPE_OPT_KEY).equals(VIEW_TYPE_REALTIME_OPT_VAL)) { + throw new HoodieException("Realtime view not supported yet via data source. Please use HiveContext route.") + } + + if (parameters(VIEW_TYPE_OPT_KEY).equals(VIEW_TYPE_INCREMENTAL_OPT_VAL)) { + new IncrementalRelation(sqlContext, path.get, optParams, schema) + } else { + // this is just effectively RO view only, where `path` can contain a mix of + // non-hoodie/hoodie path files. set the path filter up + sqlContext.sparkContext.hadoopConfiguration.setClass( + "mapreduce.input.pathFilter.class", + classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter], + classOf[org.apache.hadoop.fs.PathFilter]); + + // simply return as a regular parquet relation + DataSource.apply( + sparkSession = sqlContext.sparkSession, + userSpecifiedSchema = Option(schema), + className = "parquet", + options = parameters.toMap) + .resolveRelation() + } + } + + /** + * Add default options for unspecified write options keys. + * + * @param parameters + * @return + */ + def parametersWithWriteDefaults(parameters: Map[String, String]) = { + val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) + defaultsMap.putIfAbsent(OPERATION_OPT_KEY, DEFAULT_OPERATION_OPT_VAL) + defaultsMap.putIfAbsent(STORAGE_TYPE_OPT_KEY, DEFAULT_STORAGE_TYPE_OPT_VAL) + defaultsMap.putIfAbsent(PRECOMBINE_FIELD_OPT_KEY, DEFAULT_PRECOMBINE_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(PAYLOAD_CLASS_OPT_KEY, DEFAULT_PAYLOAD_OPT_VAL) + defaultsMap.putIfAbsent(RECORDKEY_FIELD_OPT_KEY, DEFAULT_RECORDKEY_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) + defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) + mapAsScalaMap(defaultsMap) + } + + def toPropertiesConfiguration(params: Map[String, String]): PropertiesConfiguration = { + val propCfg = new PropertiesConfiguration() + params.foreach(kv => propCfg.addProperty(kv._1, kv._2)) + propCfg + } + + + override def createRelation(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + df: DataFrame): BaseRelation = { + + val parameters = parametersWithWriteDefaults(optParams).toMap + val path = parameters.get("path") + val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblName.isEmpty) { + throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") + } + + val storageType = parameters(STORAGE_TYPE_OPT_KEY) + val operation = parameters(OPERATION_OPT_KEY) + + // register classes & schemas + val structName = s"${tblName.get}_record" + val nameSpace = s"hoodie.${tblName.get}" + sqlContext.sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + sqlContext.sparkContext.getConf.registerAvroSchemas(schema) + + // Convert to RDD[HoodieRecord] + val keyGenerator = DataSourceUtils.createKeyGenerator( + parameters(KEYGENERATOR_CLASS_OPT_KEY), + toPropertiesConfiguration(parameters) + ) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieRecords = genericRecords.map(gr => { + val orderingVal = DataSourceUtils.getNestedFieldValAsString( + gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) + }).toJavaRDD(); + + + val basePath = new Path(parameters.get("path").get) + val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + var exists = fs.exists(basePath) + + // Handle various save modes + if (mode == SaveMode.ErrorIfExists && exists) { + throw new HoodieException(s"basePath ${basePath} already exists.") + } + if (mode == SaveMode.Ignore && exists) { + log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.") + return createRelation(sqlContext, parameters, df.schema) + } + if (mode == SaveMode.Overwrite && exists) { + log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.") + fs.delete(basePath, true) + exists = false + } + + // Create the dataset if not present (APPEND mode) + if (!exists) { + val properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); + properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); + HoodieTableMetaClient.initializePathAsHoodieDataset(fs, path.get, properties); + } + + // Create a HoodieWriteClient & issue the write. + val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sqlContext.sparkContext), + schema.toString, + path.get, + tblName.get, + mapAsJavaMap(parameters) + ) + val commitTime = client.startCommit(); + + val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) + // Check for errors and commit the write. + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write."); + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val success = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (success) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + client.close + } else { + log.error(s"Upsert failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } + } + createRelation(sqlContext, parameters, df.schema) + } + + override def shortName(): String = "hoodie" +} diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala new file mode 100644 index 000000000..6adab8d84 --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie + +import com.uber.hoodie.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} +import com.uber.hoodie.common.table.HoodieTableMetaClient +import com.uber.hoodie.common.util.ParquetUtils +import com.uber.hoodie.exception.HoodieException +import com.uber.hoodie.table.HoodieTable +import org.apache.hadoop.fs.Path +import org.apache.log4j.LogManager +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +/** + * Relation, that implements the Hoodie incremental view. + * + * Implemented for Copy_on_write storage. + * + */ +class IncrementalRelation(val sqlContext: SQLContext, + val basePath: String, + val optParams: Map[String, String], + val userSchema: StructType) extends BaseRelation with TableScan { + + private val log = LogManager.getLogger(classOf[IncrementalRelation]) + + val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val metaClient = new HoodieTableMetaClient(fs, basePath, true) + // MOR datasets not supported yet + if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { + throw new HoodieException("Incremental view not implemented yet, for merge-on-read datasets") + } + val hoodieTable = HoodieTable.getHoodieTable(metaClient, null) + val commitTimeline = hoodieTable.getCompletedCompactionCommitTimeline(); + if (commitTimeline.empty()) { + throw new HoodieException("No instants to incrementally pull") + } + if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) { + throw new HoodieException(s"Specify the begin instant time to pull from using " + + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}") + } + val commitsToReturn = commitTimeline.findInstantsInRange( + optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY), + optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, + commitTimeline.lastInstant().get().getTimestamp)) + .getInstants.iterator().toList + + // use schema from a file produced in the latest instant + val latestSchema = { + val latestMeta = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(commitsToReturn.last).get) + val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next() + AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(new Path(metaFilePath))) + } + + override def schema: StructType = latestSchema + + override def buildScan(): RDD[Row] = { + val fileIdToFullPath = mutable.HashMap[String, String]() + for (commit <- commitsToReturn) { + val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get) + fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + } + val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) + sqlContext.read.options(sOpts) + .parquet(fileIdToFullPath.values.toList: _*) + .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)) + .toDF().rdd + + } +} diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/package.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/package.scala new file mode 100644 index 000000000..63265f3f5 --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/package.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.uber.hoodie + +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter} + +package object hoodie { + /** + * Adds a method, `hoodie`, to DataFrameWriter + */ + implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) { + def avro: String => Unit = writer.format("com.uber.hoodie").save + } + + /** + * Adds a method, `hoodie`, to DataFrameReader + */ + implicit class AvroDataFrameReader(reader: DataFrameReader) { + def avro: String => DataFrame = reader.format("com.uber.hoodie").load + } +} diff --git a/hoodie-spark/src/test/java/DataSourceTestUtils.java b/hoodie-spark/src/test/java/DataSourceTestUtils.java new file mode 100644 index 000000000..47f069ee1 --- /dev/null +++ b/hoodie-spark/src/test/java/DataSourceTestUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +import com.uber.hoodie.common.TestRawTripPayload; +import com.uber.hoodie.common.model.HoodieRecord; + +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Test utils for data source tests. + */ +public class DataSourceTestUtils { + + public static Optional convertToString(HoodieRecord record) { + try { + String str = ((TestRawTripPayload) record.getData()).getJsonData(); + str = "{" + str.substring(str.indexOf("\"timestamp\":")); + return Optional.of(str.replaceAll("}", ", \"partition\": \"" + record.getPartitionPath() + "\"}")); + } catch (IOException e) { + return Optional.empty(); + } + } + + public static List convertToStringList(List records) { + return records.stream().map(hr -> convertToString(hr)) + .filter(os -> os.isPresent()) + .map(os -> os.get()) + .collect(Collectors.toList()); + } +} diff --git a/hoodie-spark/src/test/java/HoodieJavaApp.java b/hoodie-spark/src/test/java/HoodieJavaApp.java new file mode 100644 index 000000000..bf791140a --- /dev/null +++ b/hoodie-spark/src/test/java/HoodieJavaApp.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.uber.hoodie.DataSourceReadOptions; +import com.uber.hoodie.DataSourceWriteOptions; +import com.uber.hoodie.HoodieDataSourceHelpers; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.config.HoodieWriteConfig; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import java.util.List; +/** + * Sample program that writes & reads hoodie datasets via the Spark datasource + */ +public class HoodieJavaApp { + + @Parameter(names={"--table-path", "-p"}, description = "path for Hoodie sample table") + private String tablePath = "file:///tmp/hoodie/sample-table"; + + @Parameter(names={"--table-name", "-n"}, description = "table name for Hoodie sample table") + private String tableName = "hoodie_test"; + + @Parameter(names={"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") + private String tableType = HoodieTableType.COPY_ON_WRITE.name(); + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + private static Logger logger = LogManager.getLogger(HoodieJavaApp.class); + + public static void main(String[] args) throws Exception { + HoodieJavaApp cli = new HoodieJavaApp(); + JCommander cmd = new JCommander(cli, args); + + if (cli.help) { + cmd.usage(); + System.exit(1); + } + cli.run(); + } + + + + + public void run() throws Exception { + + // Spark session setup.. + SparkSession spark = SparkSession.builder() + .appName("Hoodie Spark APP") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .master("local[1]") + .getOrCreate(); + JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext()); + FileSystem fs = FileSystem.get(jssc.hadoopConfiguration()); + + // Generator of some records to be loaded in. + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + /** + * Commit with only inserts + */ + // Generate some input.. + List records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001"/* ignore */, 100)); + Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); + + // Save as hoodie dataset (copy on write) + inputDF1.write() + .format("com.uber.hoodie") // specify the hoodie source + .option("hoodie.insert.shuffle.parallelism", "2") // any hoodie client config can be passed like this + .option("hoodie.upsert.shuffle.parallelism", "2") // full list in HoodieWriteConfig & its package + .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) // insert + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") // This is the record key + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") // this is the partition to place it into + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") // use to combine duplicate records in input/with disk val + .option(HoodieWriteConfig.TABLE_NAME, tableName) // Used by hive sync and queries + .mode(SaveMode.Overwrite) // This will remove any existing data at path below, and create a new dataset if needed + .save(tablePath); // ultimately where the dataset will be placed + String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + + /** + * Commit that updates records + */ + List records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002"/* ignore */, 100)); + Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); + inputDF2.write() + .format("com.uber.hoodie") + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") + .option(HoodieWriteConfig.TABLE_NAME, tableName) + .mode(SaveMode.Append) + .save(tablePath); + String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + + /** + * Read & do some queries + */ + Dataset hoodieROViewDF = spark.read() + .format("com.uber.hoodie") + // pass any path glob, can include hoodie & non-hoodie datasets + .load(tablePath + "/*/*/*/*"); + hoodieROViewDF.registerTempTable("hoodie_ro"); + spark.sql("describe hoodie_ro").show(); + // all trips whose fare was greater than 2. + spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0").show(); + + + /** + * Consume incrementally, only changes in commit 2 above. + */ + Dataset hoodieIncViewDF = spark.read().format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1) // Only changes in write 2 above + .load(tablePath); // For incremental view, pass in the root/base path of dataset + + System.out.println("You will only see records from : " + commitInstantTime2); + hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); + } +} diff --git a/hoodie-spark/src/test/resources/log4j-surefire.properties b/hoodie-spark/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000..490c6411d --- /dev/null +++ b/hoodie-spark/src/test/resources/log4j-surefire.properties @@ -0,0 +1,28 @@ +# +# Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +log4j.rootLogger=WARN, A1 +log4j.category.com.uber=INFO +log4j.category.com.uber.hoodie.common.utils=WARN +log4j.category.com.uber.hoodie.io=WARN +log4j.category.com.uber.hoodie.common=WARN +log4j.category.com.uber.hoodie.table.log=WARN +log4j.category.org.apache.parquet.hadoop=WARN + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala new file mode 100644 index 000000000..35e2b3b58 --- /dev/null +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +import com.uber.hoodie.common.HoodieTestDataGenerator +import com.uber.hoodie.common.util.FSUtils +import com.uber.hoodie.config.HoodieWriteConfig +import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.junit.Assert._ +import org.junit.{Before, Test} +import org.junit.rules.TemporaryFolder +import org.scalatest.junit.AssertionsForJUnit + +import scala.collection.JavaConversions._ + +/** + * Basic tests on the spark datasource + */ +class DataSourceTest extends AssertionsForJUnit { + + var spark: SparkSession = null + var dataGen: HoodieTestDataGenerator = null + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + var basePath : String = null + var fs : FileSystem = null + + @Before def initialize() { + spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + dataGen = new HoodieTestDataGenerator() + val folder = new TemporaryFolder + folder.create + basePath = folder.getRoot.getAbsolutePath + fs = FSUtils.getFs + } + + @Test def testCopyOnWriteStorage() { + // Insert Operation + val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("com.uber.hoodie") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // Read RO View + val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") + .load(basePath + "/*/*/*/*"); + assertEquals(100, hoodieROViewDF1.count()) + + val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() + + // Upsert Operation + inputDF2.write.format("com.uber.hoodie") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) + + // Read RO View + val hoodieROViewDF2 = spark.read.format("com.uber.hoodie") + .load(basePath + "/*/*/*/*"); + assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated + + + // Read Incremental View + val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath); + + + assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled + val countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect(); + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + } + + @Test def testMergeOnReadStorage() { + // Bulk Insert Operation + val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("com.uber.hoodie") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + // Read RO View + try { + val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") + .load(basePath + "/*/*/*/*") + fail() // we would error out, since no compaction has yet occurred. + } catch { + case e: Exception => { + // do nothing + } + }; + } +} diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index 0c8d83193..37143bf67 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -81,8 +81,9 @@ org.apache.spark - spark-sql_2.10 + spark-sql_2.11 + com.uber.hoodie hoodie-common @@ -97,6 +98,12 @@ test + + com.uber.hoodie + hoodie-spark + ${project.version} + + org.apache.hadoop hadoop-hdfs @@ -222,7 +229,7 @@ org.apache.spark - spark-core_2.10 + spark-core_2.11 javax.servlet @@ -233,14 +240,14 @@ org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 ${spark.version} provided org.apache.spark - spark-streaming-kafka-0-8_2.10 + spark-streaming-kafka-0-8_2.11 ${spark.version} @@ -280,7 +287,7 @@ com.twitter - bijection-avro_2.10 + bijection-avro_2.11 0.9.2 diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index 8769af160..69ad2e7e7 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -21,7 +21,7 @@ package com.uber.hoodie.utilities; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieNotSupportedException; -import com.uber.hoodie.utilities.keygen.KeyGenerator; +import com.uber.hoodie.KeyGenerator; import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.Source; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; @@ -60,22 +60,6 @@ public class UtilHelpers { } } - public static KeyGenerator createKeyGenerator(String keyGeneratorClass, PropertiesConfiguration cfg) throws IOException { - try { - return (KeyGenerator) ConstructorUtils.invokeConstructor(Class.forName(keyGeneratorClass), (Object) cfg); - } catch (Throwable e) { - throw new IOException("Could not load key generator class " + keyGeneratorClass, e); - } - } - - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) throws IOException { - try { - return (HoodieRecordPayload) ConstructorUtils.invokeConstructor(Class.forName(payloadClass), (Object) record, (Object) orderingVal); - } catch (Throwable e) { - throw new IOException("Could not create payload for class: " + payloadClass, e); - } - } - /** * * TODO: Support hierarchical config files (see CONFIGURATION-609 for sample) @@ -98,11 +82,4 @@ public class UtilHelpers { } } - public static void checkRequiredProperties(PropertiesConfiguration configuration, List checkPropNames) { - checkPropNames.stream().forEach(prop -> { - if (!configuration.containsKey(prop)) { - throw new HoodieNotSupportedException("Required property "+ prop + " is missing"); - } - }); - } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index ec0cbccc9..7f45fbea3 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -22,7 +22,9 @@ import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; +import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.OverwriteWithLatestAvroPayload; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; @@ -36,10 +38,10 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.utilities.HiveIncrementalPuller; import com.uber.hoodie.utilities.UtilHelpers; -import com.uber.hoodie.utilities.keygen.SimpleKeyGenerator; +import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider; import com.uber.hoodie.utilities.sources.DFSSource; -import com.uber.hoodie.utilities.keygen.KeyGenerator; +import com.uber.hoodie.KeyGenerator; import com.uber.hoodie.utilities.schema.SchemaProvider; import com.uber.hoodie.utilities.sources.Source; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; @@ -148,7 +150,7 @@ public class HoodieDeltaStreamer implements Serializable { private void initKeyGenerator() throws IOException { PropertiesConfiguration keygenCfg = UtilHelpers.readConfig(fs, new Path(cfg.keyGeneratorProps)); log.info("Creating key generator " + cfg.keyGeneratorClass + " with configs : " + keygenCfg.toString()); - this.keyGenerator = UtilHelpers.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg); + this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, keygenCfg); } @@ -174,8 +176,6 @@ public class HoodieDeltaStreamer implements Serializable { } private void sync() throws Exception { - - // Retrieve the previous round checkpoints, if any Optional resumeCheckpointStr = Optional.empty(); if (commitTimelineOpt.isPresent()) { @@ -209,7 +209,7 @@ public class HoodieDeltaStreamer implements Serializable { JavaRDD avroRDD = dataAndCheckpoint.getKey().get(); JavaRDD records = avroRDD .map(gr -> { - HoodieRecordPayload payload = UtilHelpers.createPayload( + HoodieRecordPayload payload = DataSourceUtils.createPayload( cfg.payloadClassName, gr, (Comparable) gr.get(cfg.sourceOrderingField)); @@ -248,7 +248,6 @@ public class HoodieDeltaStreamer implements Serializable { } private HoodieWriteConfig getHoodieClientConfig(String hoodieClientCfgPath) throws Exception { - // TODO(vc): Double check all the options can be passed in like this. CompactionConfig, IndexConfig everything. return HoodieWriteConfig.newBuilder() .combineInput(true, true) .withPath(cfg.targetBasePath) @@ -322,7 +321,7 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. " + "Default: SourceWrapperPayload. Implement your own, if you want to do something other than overwriting existing value") - public String payloadClassName = DeltaStreamerAvroPayload.class.getName(); + public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); @Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema.SchemaProvider " + "to attach schemas to input & target table data, built in options: FilebasedSchemaProvider") @@ -349,7 +348,6 @@ public class HoodieDeltaStreamer implements Serializable { public static void main(String[] args) throws Exception { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, args); - // TODO(vc): Do proper validation if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java index 2b7e7ade5..5c67bbd8b 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java @@ -18,9 +18,10 @@ package com.uber.hoodie.utilities.keygen; +import com.uber.hoodie.DataSourceUtils; +import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.exception.HoodieNotSupportedException; -import com.uber.hoodie.utilities.UtilHelpers; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import org.apache.avro.generic.GenericRecord; @@ -30,7 +31,6 @@ import java.io.Serializable; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; -import java.util.Calendar; import java.util.Date; import java.util.TimeZone; @@ -65,12 +65,12 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { public TimestampBasedKeyGenerator(PropertiesConfiguration config) { super(config); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); this.outputDateFormat = config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java index dc09a703a..6a77632c3 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java @@ -18,9 +18,9 @@ package com.uber.hoodie.utilities.schema; +import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; -import com.uber.hoodie.utilities.UtilHelpers; import org.apache.avro.Schema; import org.apache.commons.configuration.PropertiesConfiguration; @@ -53,7 +53,7 @@ public class FilebasedSchemaProvider extends SchemaProvider { super(config); this.fs = FSUtils.getFs(); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP)); try { this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.SOURCE_SCHEMA_FILE_PROP)))); this.targetSchema = new Schema.Parser().parse(fs.open(new Path(config.getString(Config.TARGET_SCHEMA_FILE_PROP)))); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java index 9440dffd2..49c333782 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java @@ -18,10 +18,10 @@ package com.uber.hoodie.utilities.sources; +import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieNotSupportedException; -import com.uber.hoodie.utilities.UtilHelpers; import com.uber.hoodie.utilities.schema.SchemaProvider; import org.apache.avro.generic.GenericRecord; @@ -66,7 +66,7 @@ public class DFSSource extends Source { public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { super(config, sparkContext, dataFormat, schemaProvider); this.fs = FSUtils.getFs(); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java index 6ebc1103e..4aceb88b3 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java @@ -18,9 +18,9 @@ package com.uber.hoodie.utilities.sources; +import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; -import com.uber.hoodie.utilities.UtilHelpers; import com.uber.hoodie.utilities.schema.SchemaProvider; import org.apache.avro.generic.GenericRecord; @@ -74,7 +74,7 @@ public class HiveIncrPullSource extends Source { public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { super(config, sparkContext, dataFormat, schemaProvider); this.fs = FSUtils.getFs(); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java index 7b88ce891..f1738a385 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java @@ -18,8 +18,8 @@ package com.uber.hoodie.utilities.sources; +import com.uber.hoodie.DataSourceUtils; import com.uber.hoodie.exception.HoodieNotSupportedException; -import com.uber.hoodie.utilities.UtilHelpers; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.SchemaProvider; @@ -175,7 +175,7 @@ public class KafkaSource extends Source { Stream keys = StreamSupport.stream(Spliterators.spliteratorUnknownSize(config.getKeys(), Spliterator.NONNULL), false); keys.forEach(k -> kafkaParams.put(k, config.getString(k))); - UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME)); + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.KAFKA_TOPIC_NAME)); topicName = config.getString(Config.KAFKA_TOPIC_NAME); } diff --git a/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties b/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties index 9f0beccd3..e98189d99 100644 --- a/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties +++ b/hoodie-utilities/src/main/resources/delta-streamer-config/key-generator.properties @@ -16,5 +16,5 @@ # # -hoodie.deltastreamer.keygen.simple.recordkey.field=_row_key -hoodie.deltastreamer.keygen.simple.partitionpath.field=driver +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.partitionpath.field=driver diff --git a/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java b/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java deleted file mode 100644 index 134fc76ea..000000000 --- a/hoodie-utilities/src/test/java/HoodieSparkSQLExample.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - - -import org.apache.spark.sql.SparkSession; - -/** - * Examples to do Spark SQL on Hoodie dataset. - */ -public class HoodieSparkSQLExample { - - public static void main(String[] args) throws Exception { - - SparkSession spark = SparkSession.builder() - .appName("Hoodie SparkSQL") - .config("hive.metastore.uris","thrift://localhost:10000") - .config("spark.sql.hive.convertMetastoreParquet", false) - .enableHiveSupport() - .master("local[2]") - .getOrCreate(); - - spark.sql("describe hoodie_rt").show(); - spark.sql("select * from hoodie_rt").show(); - spark.sql("select end_lon as e1, driver, rider as r1, datestr, driver, datestr, rider, _hoodie_record_key from hoodie_rt").show(); - spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_rt where fare > 2.0").show(); - spark.sql("select count(*) as cnt, _hoodie_file_name as file from hoodie_rt group by _hoodie_file_name").show(); - } -} diff --git a/pom.xml b/pom.xml index 5700ec08e..1e08a2bea 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ hoodie-hadoop-mr hoodie-hive hoodie-utilities + hoodie-spark @@ -122,6 +123,8 @@ 1.1.0 3.1.1 2.1.0 + 2.11.8 + 2.11 @@ -436,13 +439,13 @@ org.apache.spark - spark-core_2.10 + spark-core_2.11 ${spark.version} provided org.apache.spark - spark-sql_2.10 + spark-sql_2.11 ${spark.version} provided @@ -512,7 +515,7 @@ org.apache.commons commons-configuration2 - 2.1 + 2.1.1