diff --git a/docs/quickstart.md b/docs/quickstart.md index f463c3075..ea18c752f 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -15,6 +15,11 @@ Check out code and pull it into Intellij as a normal maven project. Normally build the maven project, from command line ``` $ mvn clean install -DskipTests + +To work with older version of Hive (pre Hive-1.2.1), use + +$ mvn clean install -DskipTests -Dhive11 + ``` {% include callout.html content="You might want to add your spark jars folder to project dependencies under 'Module Setttings', to be able to run Spark from IDE" type="info" %} @@ -22,16 +27,45 @@ $ mvn clean install -DskipTests {% include note.html content="Setup your local hadoop/hive test environment, so you can play with entire ecosystem. See [this](http://www.bytearray.io/2016/05/setting-up-hadoopyarnsparkhive-on-mac.html) for reference" %} +## Supported Versions + +Hoodie requires Java 8 to be installed. Hoodie works with Spark-2.x versions. We have verified that hoodie works with the following combination of Hadoop/Hive/Spark. + +| Hadoop | Hive | Spark | Instructions to Build Hoodie | +| ---- | ----- | ---- | ---- | +| 2.6.0-cdh5.7.2 | 1.1.0-cdh5.7.2 | spark-2.[1-3].x | Use "mvn clean install -DskipTests -Dhive11". Jars will have ".hive11" as suffix | +| Apache hadoop-2.8.4 | Apache hive-2.3.3 | spark-2.[1-3].x | Use "mvn clean install -DskipTests" | +| Apache hadoop-2.7.3 | Apache hive-1.2.1 | spark-2.[1-3].x | Use "mvn clean install -DskipTests" | + +If your environment has other versions of hadoop/hive/spark, please try out hoodie and let us know if there are any issues. We are limited by our bandwidth to certify other combinations. +It would be of great help if you can reach out to us with your setup and experience with hoodie. ## Generate a Hoodie Dataset +### Requirements & Environment Variable + +Please set the following environment variablies according to your setup. We have given an example setup with CDH version + +``` +export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre/ +export HIVE_HOME=/var/hadoop/setup/apache-hive-1.1.0-cdh5.7.2-bin +export HADOOP_HOME=/var/hadoop/setup/hadoop-2.6.0-cdh5.7.2 +export HADOOP_INSTALL=/var/hadoop/setup/hadoop-2.6.0-cdh5.7.2 +export HADOOP_CONF_DIR=$HADOOP_INSTALL/etc/hadoop +export SPARK_HOME=/var/hadoop/setup/spark-2.3.1-bin-hadoop2.7 +export SPARK_INSTALL=$SPARK_HOME +export SPARK_CONF_DIR=$SPARK_HOME/conf +export PATH=$JAVA_HOME/bin:$HIVE_HOME/bin:$HADOOP_HOME/bin:$SPARK_INSTALL/bin:$PATH +``` ### DataSource API -Run __hoodie-spark/src/test/java/HoodieJavaApp.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem +Run __hoodie-spark/src/test/java/HoodieJavaApp.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem. Use the wrapper script +to run from command-line ``` - +cd hoodie-spark +./run_hoodie_app.sh --help Usage:
[options] Options: --help, -h @@ -69,11 +103,12 @@ Now, lets see how we can publish this data into Hive. hdfs namenode # start name node hdfs datanode # start data node -bin/hive --service metastore -p 10000 # start metastore +bin/hive --service metastore # start metastore bin/hiveserver2 \ - --hiveconf hive.server2.thrift.port=10010 \ --hiveconf hive.root.logger=INFO,console \ - --hiveconf hive.aux.jars.path=hoodie/hoodie-hadoop-mr/target/hoodie-hadoop-mr-0.3.6-SNAPSHOT.jar + --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ + --hiveconf ive.stats.autogather=false \ + --hiveconf hive.aux.jars.path=hoodie/packaging/hoodie-hadoop-mr-bundle/target/hoodie-hadoop-mr-bundle-0.4.3-SNAPSHOT.jar ``` @@ -86,7 +121,8 @@ It uses an incremental approach by storing the last commit time synced in the TB This can be run as frequently as the ingestion pipeline to make sure new partitions and schema evolution changes are reflected immediately. ``` -{JAVA8}/bin/java -cp "/etc/hive/conf:./hoodie-hive-0.3.8-SNAPSHOT-jar-with-dependencies.jar:/opt/hadoop/lib/hadoop-mapreduce/*" com.uber.hoodie.hive.HiveSyncTool +cd hoodie-hive +./run_sync_tool.sh --user hive --pass hive --database default @@ -100,17 +136,19 @@ This can be run as frequently as the ingestion pipeline to make sure new partiti #### Manually via Beeline -Add in the hoodie-hadoop-mr jar so, Hive can read the Hoodie dataset and answer the query. +Add in the hoodie-hadoop-mr-bundler jar so, Hive can read the Hoodie dataset and answer the query. +Also, For reading hoodie tables using hive, the following configs needs to be setup ``` -hive> add jar file:///tmp/hoodie-hadoop-mr-0.2.7.jar; -Added [file:///tmp/hoodie-hadoop-mr-0.2.7.jar] to class path -Added resources: [file:///tmp/hoodie-hadoop-mr-0.2.7.jar] +hive> set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +hive> set hive.stats.autogather=false; +hive> add jar file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar; +Added [file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar] to class path +Added resources: [file:///tmp/hoodie-hadoop-mr-bundle-0.4.3.jar] ``` Then, you need to create a __ReadOptimized__ Hive table as below (only type supported as of now)and register the sample partitions - ``` drop table hoodie_test; CREATE EXTERNAL TABLE hoodie_test(`_row_key` string, @@ -200,8 +238,7 @@ Spark is super easy, once you get Hive working as above. Just spin up a Spark Sh ``` $ cd $SPARK_INSTALL -$ export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop -$ spark-shell --jars /tmp/hoodie-hadoop-mr-0.2.7.jar --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false +$ spark-shell --jars $HUDI_SRC/packaging/hoodie-spark-bundle/target/hoodie-spark-bundle-0.4.3-SNAPSHOT.jar --driver-class-path $HADOOP_CONF_DIR --conf spark.sql.hive.convertMetastoreParquet=false --packages com.databricks:spark-avro_2.11:4.0.0 scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> sqlContext.sql("show tables").show(10000) diff --git a/docs/sql_queries.md b/docs/sql_queries.md index 45ebb4cf8..f133a85bb 100644 --- a/docs/sql_queries.md +++ b/docs/sql_queries.md @@ -20,7 +20,7 @@ In the following sections, we cover the configs needed across different query en ## Hive For HiveServer2 access, [install](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr) -the hoodie-hadoop-mr-x.y.z-SNAPSHOT.jar into the aux jars path and we should be able to recognize the Hoodie tables and query them correctly. +the hoodie-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar into the aux jars path and we should be able to recognize the Hoodie tables and query them correctly. For beeline access, the `hive.input.format` variable needs to be set to the fully qualified path name of the inputformat `com.uber.hoodie.hadoop.HoodieInputFormat` For Tez, additionally the `hive.tez.input.format` needs to be set to `org.apache.hadoop.hive.ql.io.HiveInputFormat` @@ -39,7 +39,7 @@ However benchmarks have not revealed any real performance degradation with Hoodi Sample command is provided below to spin up Spark Shell ``` -$ spark-shell --jars hoodie-hadoop-mr-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client +$ spark-shell --jars hoodie-spark-bundle-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages com.databricks:spark-avro_2.11:4.0.0 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client scala> sqlContext.sql("select count(*) from uber.trips where datestr = '2016-10-02'").show() @@ -62,7 +62,7 @@ spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.clas ## Presto -Presto requires a [patch](https://github.com/prestodb/presto/pull/7002) (until the PR is merged) and the hoodie-hadoop-mr jar to be placed +Presto requires a [patch](https://github.com/prestodb/presto/pull/7002) (until the PR is merged) and the hoodie-hadoop-mr-bundle jar to be placed into `/plugin/hive-hadoop2/`. {% include callout.html content="Get involved to improve this integration [here](https://github.com/uber/hoodie/issues/81)" type="info" %} diff --git a/hoodie-cli/pom.xml b/hoodie-cli/pom.xml index c696972f0..515ea1d7f 100644 --- a/hoodie-cli/pom.xml +++ b/hoodie-cli/pom.xml @@ -167,11 +167,29 @@ log4j ${log4j.version} + + + com.uber.hoodie + hoodie-hive + ${project.version} + + com.uber.hoodie hoodie-client ${project.version} + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop + hadoop-hdfs + + com.uber.hoodie hoodie-common diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml index 7b0f7ab57..b8a316db8 100644 --- a/hoodie-client/pom.xml +++ b/hoodie-client/pom.xml @@ -39,8 +39,12 @@ test-jar + test-compile + + false + org.apache.rat @@ -107,6 +111,7 @@ com.uber.hoodie hoodie-common ${project.version} + tests test-jar test @@ -183,12 +188,6 @@ ${project.version} test - - org.apache.hive - hive-exec - test - - org.apache.hbase @@ -218,5 +217,40 @@ - + + + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-exec + ${hive12.version} + test + + + + + hive11 + + + hive11 + + + + + ${hive11.groupid} + hive-exec + ${hive11.version} + test + + + + + diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 071ae7a63..df28b1100 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -40,8 +40,12 @@ test-jar + test-compile + + false + org.apache.rat @@ -90,6 +94,10 @@ jackson-annotations ${fasterxml.version} + + com.fasterxml.jackson.core + jackson-databind + org.apache.parquet parquet-avro diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index c75c19d77..0aee2e86c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -333,4 +333,13 @@ public class HoodieCommitMetadata implements Serializable { mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); return mapper; } + + @Override + public String toString() { + return "HoodieCommitMetadata{" + + "partitionToWriteStats=" + partitionToWriteStats + + ", compacted=" + compacted + + ", extraMetadataMap=" + extraMetadataMap + + '}'; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index 8e8033b5b..8b49323e9 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -94,4 +94,4 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { public void remove() { } -} \ No newline at end of file +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java index 45b69e0b6..9c1dc2273 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java @@ -27,9 +27,11 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; @@ -136,6 +138,26 @@ public class HoodieAvroUtils { return record; } + /** + * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. + * As different query engines have varying constraints regarding treating the case-sensitivity of fields, its best + * to let caller determine that. + * @param schema Passed in schema + * @param newFieldNames Null Field names to be added + * @return + */ + public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { + List newFields = schema.getFields().stream().map(field -> { + return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); + }).collect(Collectors.toList()); + for (String newField : newFieldNames) { + newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", null)); + } + Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); + newSchema.setFields(newFields); + return newSchema; + } + /** * Adds the Hoodie commit metadata into the provided Generic Record. */ @@ -155,7 +177,7 @@ public class HoodieAvroUtils { for (Schema.Field f : record.getSchema().getFields()) { newRecord.put(f.name(), record.get(f.name())); } - if (!new GenericData().validate(newSchema, newRecord)) { + if (!GenericData.get().validate(newSchema, newRecord)) { throw new SchemaCompatabilityException( "Unable to validate the rewritten record " + record + " against schema " + newSchema); diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml index fca84ad4d..004b8ddce 100644 --- a/hoodie-hadoop-mr/pom.xml +++ b/hoodie-hadoop-mr/pom.xml @@ -35,6 +35,7 @@ com.uber.hoodie hoodie-common ${project.version} + tests test-jar test @@ -59,17 +60,13 @@ org.apache.hadoop hadoop-hdfs - - org.apache.hive - hive-exec - commons-logging commons-logging - org.apache.hive - hive-jdbc + org.apache.commons + commons-lang3 org.apache.parquet @@ -79,6 +76,10 @@ com.twitter parquet-avro + + com.twitter + parquet-hadoop-bundle + com.twitter.common objectsize @@ -93,6 +94,11 @@ kryo test + + junit + junit + test + @@ -101,33 +107,60 @@ org.apache.rat apache-rat-plugin - - org.apache.maven.plugins - maven-shade-plugin - 2.4 - - - package - - shade - - - ${project.build.directory}/dependency-reduced-pom.xml - - true - - - com.uber.hoodie:hoodie-common - com.twitter:parquet-avro - com.twitter.common:objectsize - - - - - - - - + + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + commons-logging + commons-logging + + + + + ${hive12.groupid} + hive-exec + ${hive12.version} + + + + + hive11 + + + hive11 + + + + + ${hive11.groupid} + hive-jdbc + ${hive11.version} + + + commons-logging + commons-logging + + + + + ${hive11.groupid} + hive-exec + ${hive11.version} + + + + diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 84d7da39c..4c0d548d7 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -16,13 +16,8 @@ package com.uber.hoodie.hadoop; -import static parquet.filter2.predicate.FilterApi.and; -import static parquet.filter2.predicate.FilterApi.binaryColumn; -import static parquet.filter2.predicate.FilterApi.gt; - import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodiePartitionMetadata; -import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -43,26 +38,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; -import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; -import parquet.filter2.predicate.FilterPredicate; -import parquet.filter2.predicate.Operators; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.metadata.FileMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.api.Binary; /** * HoodieInputFormat which understands the Hoodie File Structure and filters files based on the @@ -219,62 +201,6 @@ public class HoodieInputFormat extends MapredParquetInputFormat implements Confi return super.getRecordReader(split, job, reporter); } - /** - * Clears out the filter expression (if this is not done, then ParquetReader will override the - * FilterPredicate set) - */ - private void clearOutExistingPredicate(JobConf job) { - job.unset(TableScanDesc.FILTER_EXPR_CONF_STR); - } - - /** - * Constructs the predicate to push down to parquet storage. This creates the predicate for - * `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is - * present already. - */ - private FilterPredicate constructHoodiePredicate(JobConf job, String tableName, InputSplit split) - throws IOException { - FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName); - LOG.info("Commit time predicate - " + commitTimePushdown.toString()); - FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split); - LOG.info("Existing predicate - " + existingPushdown); - - FilterPredicate hoodiePredicate; - if (existingPushdown != null) { - hoodiePredicate = and(existingPushdown, commitTimePushdown); - } else { - hoodiePredicate = commitTimePushdown; - } - LOG.info("Hoodie Predicate - " + hoodiePredicate); - return hoodiePredicate; - } - - private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split) - throws IOException { - String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); - String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); - if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() - || columnNamesString.isEmpty()) { - return null; - } else { - SearchArgument sarg = SearchArgumentFactory - .create(Utilities.deserializeExpression(serializedPushdown)); - final Path finalPath = ((FileSplit) split).getPath(); - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - return ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileMetaData.getSchema()); - } - } - - private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName) - throws IOException { - String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); - Operators.BinaryColumn sequenceColumn = binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD); - FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs)); - LOG.info("Setting predicate in InputFormat " + p.toString()); - return p; - } - /** * Read the table metadata from a data path. This assumes certain hierarchy of files which should * be changed once a better way is figured out to pass in the hoodie meta directory diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java index 158aa0039..ae55ea21b 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -126,7 +126,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), baseDir.toString()); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), fs.listStatus(folder)); List latestFiles = fsView.getLatestDataFiles() .collect(Collectors.toList()); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java index 383ce0cd9..89c2fb355 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -18,6 +18,13 @@ package com.uber.hoodie.hadoop.realtime; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; @@ -29,12 +36,15 @@ import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -44,7 +54,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -119,10 +128,15 @@ public abstract class AbstractRealtimeRecordReader { StringBuilder builder = new StringBuilder(); Writable[] values = writable.get(); - builder.append(String.format("Size: %s,", values.length)); + builder.append(String.format("(Size: %s)[", values.length)); for (Writable w : values) { - builder.append(w + " "); + if (w instanceof ArrayWritable) { + builder.append(arrayWritableToString((ArrayWritable) w) + " "); + } else { + builder.append(w + " "); + } } + builder.append("]"); return builder.toString(); } @@ -130,12 +144,9 @@ public abstract class AbstractRealtimeRecordReader { * Given a comma separated list of field names and positions at which they appear on Hive, return * a ordered list of field names, that can be passed onto storage. */ - public static List orderFields(String fieldNameCsv, String fieldOrderCsv, - String partitioningFieldsCsv) { + private static List orderFields(String fieldNameCsv, String fieldOrderCsv, List partitioningFields) { String[] fieldOrders = fieldOrderCsv.split(","); - Set partitioningFields = Arrays.stream(partitioningFieldsCsv.split(",")) - .collect(Collectors.toSet()); List fieldNames = Arrays.stream(fieldNameCsv.split(",")) .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); @@ -157,17 +168,34 @@ public abstract class AbstractRealtimeRecordReader { * columns */ public static Schema generateProjectionSchema(Schema writeSchema, List fieldNames) { + /** + * Avro & Presto field names seems to be case sensitive (support fields differing only in case) + * whereas Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable + * using spark.sql.caseSensitive=true + * + * For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro + * Here the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections + * to lower-cases + * + */ List projectedFields = new ArrayList<>(); + Map schemaFieldsMap = writeSchema.getFields().stream() + .map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); for (String fn : fieldNames) { - Schema.Field field = writeSchema.getField(fn); + Schema.Field field = schemaFieldsMap.get(fn.toLowerCase()); if (field == null) { - throw new HoodieException("Field " + fn + " not found log schema. Query cannot proceed!"); + throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + + "Derived Schema Fields: " + + schemaFieldsMap.keySet().stream().collect(Collectors.toList())); } projectedFields .add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); } - return Schema.createRecord(projectedFields); + Schema projectedSchema = Schema + .createRecord(writeSchema.getName(), writeSchema.getDoc(), writeSchema.getNamespace(), writeSchema.isError()); + projectedSchema.setFields(projectedFields); + return projectedSchema; } /** @@ -176,10 +204,16 @@ public abstract class AbstractRealtimeRecordReader { public static Writable avroToArrayWritable(Object value, Schema schema) { // if value is null, make a NullWritable + // Hive 2.x does not like NullWritable if (value == null) { - return NullWritable.get(); + + return null; + //return NullWritable.get(); } + + Writable[] wrapperWritable; + switch (schema.getType()) { case STRING: return new Text(value.toString()); @@ -196,7 +230,8 @@ public abstract class AbstractRealtimeRecordReader { case BOOLEAN: return new BooleanWritable((Boolean) value); case NULL: - return NullWritable.get(); + return null; + // return NullWritable.get(); case RECORD: GenericRecord record = (GenericRecord) value; Writable[] values1 = new Writable[schema.getFields().size()]; @@ -214,7 +249,8 @@ public abstract class AbstractRealtimeRecordReader { for (Object obj : arrayValue) { values2[index2++] = avroToArrayWritable(obj, schema.getElementType()); } - return new ArrayWritable(Writable.class, values2); + wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values2)}; + return new ArrayWritable(Writable.class, wrapperWritable); case MAP: Map mapValue = (Map) value; Writable[] values3 = new Writable[mapValue.size()]; @@ -226,7 +262,8 @@ public abstract class AbstractRealtimeRecordReader { mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); values3[index3++] = new ArrayWritable(Writable.class, mapValues); } - return new ArrayWritable(Writable.class, values3); + wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values3)}; + return new ArrayWritable(Writable.class, wrapperWritable); case UNION: List types = schema.getTypes(); if (types.size() != 2) { @@ -248,16 +285,61 @@ public abstract class AbstractRealtimeRecordReader { } } + public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + if (lastBlock != null) { + return lastBlock.getSchema(); + } + return null; + } + + /** + * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file + * to also be part of the projected schema. Hive expects the record reader implementation to return the row in its + * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema + * also includes partition columns + * @param schema Schema to be changed + * @return + */ + private static Schema addPartitionFields(Schema schema, List partitioningFields) { + final Set firstLevelFieldNames = schema.getFields().stream().map(Field::name) + .map(String::toLowerCase).collect(Collectors.toSet()); + List fieldsToAdd = partitioningFields.stream().map(String::toLowerCase) + .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList()); + + return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd); + } + /** * Goes through the log files and populates a map with latest version of each key logged, since * the base split was written. */ private void init() throws IOException { writerSchema = new AvroSchemaConverter().convert(baseFileSchema); + List fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList()); + if (split.getDeltaFilePaths().size() > 0) { + String logPath = split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1); + FileSystem fs = FSUtils.getFs(logPath, jobConf); + writerSchema = readSchemaFromLogFile(fs, new Path(logPath)); + fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList()); + } + + // Add partitioning fields to writer schema for resulting row to contain null values for these fields + List partitioningFields = Arrays.stream( + jobConf.get("partition_columns", "").split(",")).collect(Collectors.toList()); + writerSchema = addPartitionFields(writerSchema, partitioningFields); + List projectionFields = orderFields( jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), - jobConf.get("partition_columns", "")); + partitioningFields); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before readerSchema = generateProjectionSchema(writerSchema, projectionFields); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index e423bd018..04351202c 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -203,21 +203,31 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf @Override public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { + + LOG.info("Before adding Hoodie columns, Projections :" + job + .get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + + // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; + // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases + // hoodie additional projection columns are reset after calling setConf and only natural projections + // (one found in select queries) are set. things would break because of this. + // For e:g _hoodie_record_key would be missing and merge step would throw exceptions. + // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time. + this.conf = addRequiredProjectionFields(job); + LOG.info("Creating record reader with readCols :" + job - .get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + .get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // sanity check Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit, "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split); + return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter)); } - @Override - public void setConf(Configuration conf) { - this.conf = addRequiredProjectionFields(conf); - } - @Override public Configuration getConf() { return conf; diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java index c64933389..266e0d64c 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -95,8 +95,15 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro ? Writable[] replaceValue = deltaRecordMap.get(key).get(); Writable[] originalValue = arrayWritable.get(); - System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length); - arrayWritable.set(originalValue); + try { + System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length); + arrayWritable.set(originalValue); + } catch (RuntimeException re) { + LOG.error("Got exception when doing array copy", re); + LOG.error("Base record :" + arrayWritableToString(arrayWritable)); + LOG.error("Log record :" + arrayWritableToString(deltaRecordMap.get(key))); + throw re; + } } return true; } diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestRecordReaderValueIterator.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestRecordReaderValueIterator.java index 06fc41c99..95aa8f036 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestRecordReaderValueIterator.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestRecordReaderValueIterator.java @@ -18,11 +18,11 @@ package com.uber.hoodie.hadoop; -import groovy.lang.Tuple2; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.RecordReader; @@ -41,8 +41,8 @@ public class TestRecordReaderValueIterator { "spark", "dataset", }; - List> entries = IntStream.range(0, values.length) - .boxed().map(idx -> new Tuple2<>(idx, values[idx])).collect(Collectors.toList()); + List> entries = IntStream.range(0, values.length) + .boxed().map(idx -> Pair.of(idx, values[idx])).collect(Collectors.toList()); TestRecordReader reader = new TestRecordReader(entries); RecordReaderValueIterator itr = new RecordReaderValueIterator(reader); for (int i = 0; i < values.length; i++) { @@ -58,10 +58,10 @@ public class TestRecordReaderValueIterator { */ private static class TestRecordReader implements RecordReader { - private final List> entries; + private final List> entries; private int currIndex = 0; - public TestRecordReader(List> entries) { + public TestRecordReader(List> entries) { this.entries = entries; } @@ -71,8 +71,8 @@ public class TestRecordReaderValueIterator { if (currIndex >= entries.size()) { return false; } - key.set(entries.get(currIndex).getFirst()); - value.set(entries.get(currIndex).getSecond()); + key.set(entries.get(currIndex).getLeft()); + value.set(entries.get(currIndex).getRight()); currIndex++; return true; } diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index a889e1a6e..b3b095592 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -104,7 +104,6 @@ public class HoodieRealtimeRecordReaderTest { header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); writer = writer.appendBlock(dataBlock); - long size = writer.getCurrentSize(); return writer; } @@ -348,7 +347,7 @@ public class HoodieRealtimeRecordReaderTest { // Assert type MAP ArrayWritable mapItem = (ArrayWritable) values[12]; - Writable[] mapItemValues = mapItem.get(); + Writable[] mapItemValues = ((ArrayWritable) mapItem.get()[0]).get(); ArrayWritable mapItemValue1 = (ArrayWritable) mapItemValues[0]; ArrayWritable mapItemValue2 = (ArrayWritable) mapItemValues[1]; Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), @@ -381,10 +380,10 @@ public class HoodieRealtimeRecordReaderTest { // Assert type ARRAY ArrayWritable arrayValue = (ArrayWritable) values[14]; - Writable[] arrayValues = arrayValue.get(); + Writable[] arrayValues = ((ArrayWritable) arrayValue.get()[0]).get(); for (int i = 0; i < arrayValues.length; i++) { - Assert.assertEquals("test value for field: stringArray", arrayValues[i].toString(), - "stringArray" + i + recordCommitTimeSuffix); + Assert.assertEquals("test value for field: stringArray", "stringArray" + i + recordCommitTimeSuffix, + arrayValues[i].toString()); } } } diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml index cbaedb9fd..e9cff4750 100644 --- a/hoodie-hive/pom.xml +++ b/hoodie-hive/pom.xml @@ -43,38 +43,32 @@ org.apache.hadoop hadoop-auth - - org.apache.hive - hive-common - - - org.apache.hive - hive-jdbc - com.google.guava guava - - org.apache.hive - hive-service - - - org.apache.hive - hive-metastore - org.apache.thrift libthrift 0.9.2 + + joda-time + joda-time + + commons-dbcp commons-dbcp + + commons-io + commons-io + + org.slf4j @@ -90,6 +84,16 @@ jcommander + + org.apache.httpcomponents + httpcore + + + + org.apache.httpcomponents + httpclient + + junit @@ -159,52 +163,79 @@ org.apache.maven.plugins - maven-assembly-plugin - 2.4.1 - - - src/assembly/src.xml - - - - com.uber.hoodie.hive.HiveSyncTool - - - - + maven-jar-plugin + 2.5 - make-assembly - - package - single + test-jar - - org.apache.maven.plugins - maven-dependency-plugin - 2.4 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/jars - false - false - true - - - - - + + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-service + ${hive12.version} + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + + + ${hive12.groupid} + hive-common + ${hive12.version} + + + + + hive11 + + + hive11 + + + + + org.apache.hive + hive-service + ${hive11.version} + + + org.apache.hive + hive-jdbc + ${hive11.version} + + + org.apache.hive + hive-metastore + ${hive11.version} + + + org.apache.hive + hive-common + ${hive11.version} + + + + diff --git a/hoodie-hive/run_sync_tool.sh b/hoodie-hive/run_sync_tool.sh new file mode 100644 index 000000000..910b4c5d8 --- /dev/null +++ b/hoodie-hive/run_sync_tool.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +function error_exit { + echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way. + exit "${2:-1}" ## Return a code specified by $2 or 1 by default. +} + +if [ -z "${HADOOP_HOME}" ]; then + error_exit "Please make sure the environment variable HADOOP_HOME is setup" +fi + +if [ -z "${HIVE_HOME}" ]; then + error_exit "Please make sure the environment variable HIVE_HOME is setup" +fi + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#Ensure we pick the right jar even for hive11 builds +HOODIE_HIVE_UBER_JAR=`ls -c $DIR/../packaging/hoodie-hive-bundle/target/hoodie-hive-*.jar | head -1` + +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" +fi + +## Include only specific packages from HIVE_HOME/lib to avoid version mismatches +HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar` +HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc` +HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar` +# Hive 1.x/CDH has standalone jdbc jar which is no longer available in 2.x +HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*standalone*.jar` +if [ -z "${HIVE_JDBC}" ]; then + HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler` +fi +HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_SERVICE:$HIVE_JDBC + +HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* + +echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HOODIE_HIVE_UBER_JAR com.uber.hoodie.hive.HiveSyncTool $@" +java -cp $HOODIE_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} com.uber.hoodie.hive.HiveSyncTool "$@" diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java index dd9e70149..5e81ad9af 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java @@ -69,4 +69,20 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + + @Override + public String toString() { + return "HiveSyncConfig{" + + "databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", hiveUser='" + hiveUser + '\'' + + ", hivePass='" + hivePass + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", basePath='" + basePath + '\'' + + ", partitionFields=" + partitionFields + + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", help=" + help + + '}'; + } } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index 55d74d8f1..03268fe2d 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieFileFormat; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -34,6 +35,7 @@ import com.uber.hoodie.hive.util.SchemaUtil; import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.Driver; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -43,6 +45,9 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.dbcp.ConnectionFactory; +import org.apache.commons.dbcp.DriverConnectionFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -185,8 +190,7 @@ public class HoodieHiveClient { String fullPartitionPath = new Path(syncConfig.basePath, partition).toString(); String changePartition = - alterTable + " PARTITION (" + partBuilder.toString() + ") SET LOCATION '" - + "hdfs://nameservice1" + fullPartitionPath + "'"; + alterTable + " PARTITION (" + partBuilder.toString() + ") SET LOCATION '" + fullPartitionPath + "'"; changePartitions.add(changePartition); } return changePartitions; @@ -234,7 +238,7 @@ public class HoodieHiveClient { void updateTableDefinition(MessageType newSchema) { try { - String newSchemaStr = SchemaUtil.generateSchemaString(newSchema); + String newSchemaStr = SchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields); // Cascade clause should not be present for non-partitioned tables String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : ""; StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append("`") @@ -242,7 +246,7 @@ public class HoodieHiveClient { .append(syncConfig.tableName).append("`") .append(" REPLACE COLUMNS(").append(newSchemaStr).append(" )") .append(cascadeClause); - LOG.info("Creating table with " + sqlBuilder); + LOG.info("Updating table definition with " + sqlBuilder); updateHiveSQL(sqlBuilder.toString()); } catch (IOException e) { throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e); @@ -311,7 +315,8 @@ public class HoodieHiveClient { String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() .stream().findAny().orElseThrow(() -> new IllegalArgumentException( "Could not find any data file written for commit " + lastCommit - + ", could not get schema for dataset " + metaClient.getBasePath())); + + ", could not get schema for dataset " + metaClient.getBasePath() + + ", Metadata :" + commitMetadata)); return readSchemaFromDataFile(new Path(filePath)); case MERGE_ON_READ: // If this is MOR, depending on whether the latest commit is a delta commit or @@ -340,12 +345,31 @@ public class HoodieHiveClient { // read from the log file wrote commitMetadata = HoodieCommitMetadata.fromBytes( activeTimeline.getInstantDetails(lastDeltaInstant).get(), HoodieCommitMetadata.class); - filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() - .stream().filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)) - .findAny().orElseThrow(() -> new IllegalArgumentException( - "Could not find any data file written for commit " + lastDeltaInstant - + ", could not get schema for dataset " + metaClient.getBasePath())); - return readSchemaFromLogFile(lastCompactionCommit, new Path(filePath)); + Pair filePathWithFormat = + commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values() + .stream().filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)) + .findAny().map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)) + .orElseGet(() -> { + // No Log files in Delta-Commit. Check if there are any parquet files + return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() + .filter(s -> s.contains((metaClient.getTableConfig().getROFileFormat().getFileExtension()))) + .findAny() + .map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> { + return new IllegalArgumentException( + "Could not find any data file written for commit " + lastDeltaInstant + + ", could not get schema for dataset " + metaClient.getBasePath() + + ", CommitMetadata :" + commitMetadata); + }); + }); + switch (filePathWithFormat.getRight()) { + case HOODIE_LOG: + return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft())); + case PARQUET: + return readSchemaFromDataFile(new Path(filePathWithFormat.getLeft())); + default: + throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() + + " for file " + filePathWithFormat.getLeft()); + } } else { return readSchemaFromLastCompaction(lastCompactionCommit); } @@ -442,14 +466,15 @@ public class HoodieHiveClient { private void createHiveConnection() { if (connection == null) { - BasicDataSource ds = new BasicDataSource(); - ds.setDriverClassName(driverName); + BasicDataSource ds = new HiveDataSource(); + ds.setDriverClassName(HiveDriver.class.getCanonicalName()); ds.setUrl(getHiveJdbcUrlWithDefaultDBName()); ds.setUsername(syncConfig.hiveUser); ds.setPassword(syncConfig.hivePass); LOG.info("Getting Hive Connection from Datasource " + ds); try { this.connection = ds.getConnection(); + LOG.info("Successfully got Hive Connection from Datasource " + ds); } catch (SQLException e) { throw new HoodieHiveSyncException( "Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e); @@ -589,4 +614,54 @@ public class HoodieHiveClient { return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); } } + + /** + * There is a bug in BasicDataSource implementation (dbcp-1.4) which does not allow custom version of Driver (needed + * to talk to older version of HiveServer2 including CDH-5x). This is fixed in dbcp-2x but we are using dbcp1.4. + * Adding a workaround here. TODO: varadarb We need to investigate moving to dbcp-2x + */ + protected class HiveDataSource extends BasicDataSource { + + protected ConnectionFactory createConnectionFactory() throws SQLException { + try { + Driver driver = HiveDriver.class.newInstance(); + // Can't test without a validationQuery + if (validationQuery == null) { + setTestOnBorrow(false); + setTestOnReturn(false); + setTestWhileIdle(false); + } + + // Set up the driver connection factory we will use + String user = username; + if (user != null) { + connectionProperties.put("user", user); + } else { + log("DBCP DataSource configured without a 'username'"); + } + + String pwd = password; + if (pwd != null) { + connectionProperties.put("password", pwd); + } else { + log("DBCP DataSource configured without a 'password'"); + } + + ConnectionFactory driverConnectionFactory = new DriverConnectionFactory(driver, url, connectionProperties); + return driverConnectionFactory; + } catch (Throwable x) { + LOG.warn("Got exception trying to instantiate connection factory. Trying default instantiation", x); + return super.createConnectionFactory(); + } + } + + @Override + public String toString() { + return "HiveDataSource{" + + "driverClassName='" + driverClassName + '\'' + + ", driverClassLoader=" + driverClassLoader + + ", url='" + url + '\'' + + '}'; + } + } } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java index 098c01368..cb60ad2bd 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java @@ -27,9 +27,11 @@ import com.uber.hoodie.hive.HiveSyncConfig; import com.uber.hoodie.hive.HoodieHiveSyncException; import com.uber.hoodie.hive.SchemaDifference; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -322,6 +324,14 @@ public class SchemaUtil { return result; } + private static String removeSurroundingTick(String result) { + if (result.startsWith("`") && result.endsWith("`")) { + result = result.substring(1, result.length() - 1); + } + + return result; + } + /** * Create a 'Map' schema from Parquet map field */ @@ -372,11 +382,17 @@ public class SchemaUtil { } public static String generateSchemaString(MessageType storageSchema) throws IOException { + return generateSchemaString(storageSchema, new ArrayList<>()); + } + + public static String generateSchemaString(MessageType storageSchema, List colsToSkip) throws IOException { Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); StringBuilder columns = new StringBuilder(); for (Map.Entry hiveSchemaEntry : hiveSchema.entrySet()) { - columns.append(hiveSchemaEntry.getKey()).append(" "); - columns.append(hiveSchemaEntry.getValue()).append(", "); + if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) { + columns.append(hiveSchemaEntry.getKey()).append(" "); + columns.append(hiveSchemaEntry.getValue()).append(", "); + } } // Remove the last ", " columns.delete(columns.length() - 2, columns.length()); @@ -386,19 +402,20 @@ public class SchemaUtil { public static String generateCreateDDL(MessageType storageSchema, HiveSyncConfig config, String inputFormatClass, String outputFormatClass, String serdeClass) throws IOException { Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); - String columns = generateSchemaString(storageSchema); + String columns = generateSchemaString(storageSchema, config.partitionFields); - StringBuilder partitionFields = new StringBuilder(); + List partitionFields = new ArrayList<>(); for (String partitionKey : config.partitionFields) { - partitionFields.append(partitionKey).append(" ") - .append(getPartitionKeyType(hiveSchema, partitionKey)); + partitionFields.add(new StringBuilder().append(partitionKey).append(" ") + .append(getPartitionKeyType(hiveSchema, partitionKey)).toString()); } + String paritionsStr = partitionFields.stream().collect(Collectors.joining(",")); StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); sb = sb.append(config.databaseName).append(".").append(config.tableName); sb = sb.append("( ").append(columns).append(")"); if (!config.partitionFields.isEmpty()) { - sb = sb.append(" PARTITIONED BY (").append(partitionFields).append(")"); + sb = sb.append(" PARTITIONED BY (").append(paritionsStr).append(")"); } sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml index 5ed93b3b0..55d373d6d 100644 --- a/hoodie-spark/pom.xml +++ b/hoodie-spark/pom.xml @@ -118,8 +118,11 @@ - + + org.apache.avro + avro + org.scala-lang scala-library @@ -200,17 +203,22 @@ hoodie-hadoop-mr ${project.version} + + com.uber.hoodie + hoodie-hive + ${project.version} + junit junit-dep ${junit.version} test - com.uber.hoodie hoodie-client ${project.version} + tests test-jar test @@ -218,9 +226,72 @@ com.uber.hoodie hoodie-common ${project.version} + tests test-jar test + + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-service + ${hive12.version} + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + + + ${hive12.groupid} + hive-common + ${hive12.version} + + + + + hive11 + + + hive11 + + + + + ${hive11.groupid} + hive-service + ${hive11.version} + + + ${hive11.groupid} + hive-jdbc + ${hive11.version} + + + ${hive11.groupid} + hive-metastore + ${hive11.version} + + + ${hive11.groupid} + hive-common + ${hive11.version} + + + + diff --git a/hoodie-spark/run_hoodie_app.sh b/hoodie-spark/run_hoodie_app.sh new file mode 100644 index 000000000..ec82eddb9 --- /dev/null +++ b/hoodie-spark/run_hoodie_app.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +function error_exit { + echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way. + exit "${2:-1}" ## Return a code specified by $2 or 1 by default. +} + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#Ensure we pick the right jar even for hive11 builds +HOODIE_JAR=`ls -c $DIR/../packaging/hoodie-spark-bundle/target/hoodie-spark-bundle-*.jar | head -1` + +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="/etc/hadoop/conf" +fi + +if [ -z "$CLIENT_JAR" ]; then + echo "client jar location not set" +fi + +OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'` +#TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests +echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hoodie-client/target/test-classes/:${HADOOP_CONF_DIR}:$HOODIE_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp $@" +java -cp $DIR/target/test-classes/:$DIR/../hoodie-client/target/test-classes/:${HADOOP_CONF_DIR}:$HOODIE_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@" diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 551a1865d..5c0d27248 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -32,6 +32,8 @@ import com.uber.hoodie.index.HoodieIndex; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -47,7 +49,8 @@ public class DataSourceUtils { public static String getNestedFieldValAsString(GenericRecord record, String fieldName) { String[] parts = fieldName.split("\\."); GenericRecord valueNode = record; - for (int i = 0; i < parts.length; i++) { + int i = 0; + for (;i < parts.length; i++) { String part = parts[i]; Object val = valueNode.get(part); if (val == null) { @@ -65,7 +68,9 @@ public class DataSourceUtils { valueNode = (GenericRecord) val; } } - throw new HoodieException(fieldName + " field not found in record"); + throw new HoodieException(fieldName + "(Part -" + parts[i] + ") field not found in record. " + + "Acceptable fields were :" + valueNode.getSchema().getFields() + .stream().map(Field::name).collect(Collectors.toList())); } /** diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala index c4e3307c9..26040337f 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -19,6 +19,7 @@ package com.uber.hoodie import com.uber.hoodie.common.model.HoodieTableType +import com.uber.hoodie.hive.SlashEncodedDayPartitionValueExtractor /** * List of options that can be passed to the Hoodie datasource, @@ -143,4 +144,28 @@ object DataSourceWriteOptions { */ val COMMIT_METADATA_KEYPREFIX_OPT_KEY = "hoodie.datasource.write.commitmeta.key.prefix" val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_" + + // HIVE SYNC SPECIFIC CONFIGS + //NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes + // unexpected issues with config getting reset + val HIVE_SYNC_ENABLED_OPT_KEY = "hoodie.datasource.hive_sync.enable" + val HIVE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.database" + val HIVE_TABLE_OPT_KEY = "hoodie.datasource.hive_sync.table" + val HIVE_USER_OPT_KEY = "hoodie.datasource.hive_sync.username" + val HIVE_PASS_OPT_KEY = "hoodie.datasource.hive_sync.password" + val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcUrl" + val HIVE_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.hive_sync.partition_fields" + val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.hive_sync.partition_extractor_class" + val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = "hoodie.datasource.hive_sync.assume_date_partitioning" + + // DEFAULT FOR HIVE SPECIFIC CONFIGS + val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false" + val DEFAULT_HIVE_DATABASE_OPT_VAL = "default" + val DEFAULT_HIVE_TABLE_OPT_VAL = "unknown" + val DEFAULT_HIVE_USER_OPT_VAL = "hive" + val DEFAULT_HIVE_PASS_OPT_VAL = "hive" + val DEFAULT_HIVE_URL_OPT_VAL = "jdbc:hive2://localhost:10000" + val DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL = "" + val DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName + val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false" } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index 5a94f9e60..580529513 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -25,11 +25,12 @@ import java.util.{Optional, Properties} import com.uber.hoodie.DataSourceReadOptions._ import com.uber.hoodie.DataSourceWriteOptions._ import com.uber.hoodie.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import com.uber.hoodie.common.util.TypedProperties +import com.uber.hoodie.common.util.{FSUtils, TypedProperties} import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.exception.HoodieException import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.conf.HiveConf import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD @@ -39,6 +40,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer /** * Hoodie Spark Datasource, for reading and writing hoodie datasets @@ -92,6 +94,7 @@ class DefaultSource extends RelationProvider classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); + log.info("Constructing hoodie (as parquet) data source with options :" + parameters) // simply return as a regular parquet relation DataSource.apply( sparkSession = sqlContext.sparkSession, @@ -118,6 +121,15 @@ class DefaultSource extends RelationProvider defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_USER_OPT_KEY, DEFAULT_HIVE_USER_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PASS_OPT_KEY, DEFAULT_HIVE_PASS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_URL_OPT_KEY, DEFAULT_HIVE_URL_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PARTITION_FIELDS_OPT_KEY, DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_ASSUME_DATE_PARTITION_OPT_KEY, DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL) mapAsScalaMap(defaultsMap) } @@ -200,7 +212,8 @@ class DefaultSource extends RelationProvider } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sparkContext), + val jsc = new JavaSparkContext(sparkContext); + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, @@ -228,6 +241,13 @@ class DefaultSource extends RelationProvider else { log.info("Commit " + commitTime + " failed!") } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) + if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } client.close } else { log.error(s"Upsert failed with ${errorCount} errors :"); @@ -247,5 +267,28 @@ class DefaultSource extends RelationProvider createRelation(sqlContext, parameters, df.schema) } + private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = { + val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters) + val hiveConf: HiveConf = new HiveConf() + hiveConf.addResource(fs.getConf) + new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() + true + } + + private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = { + val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() + hiveSyncConfig.basePath = basePath.toString + hiveSyncConfig.assumeDatePartitioning = + parameters.get(HIVE_ASSUME_DATE_PARTITION_OPT_KEY).exists(r => r.toBoolean) + hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY) + hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY) + hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY) + hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) + hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) + hiveSyncConfig.partitionFields = + ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).toList: _*) + hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) + hiveSyncConfig + } override def shortName(): String = "hoodie" } diff --git a/hoodie-spark/src/test/java/HoodieJavaApp.java b/hoodie-spark/src/test/java/HoodieJavaApp.java index bef67cfc8..e76931836 100644 --- a/hoodie-spark/src/test/java/HoodieJavaApp.java +++ b/hoodie-spark/src/test/java/HoodieJavaApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -48,6 +49,24 @@ public class HoodieJavaApp { @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") private String tableType = HoodieTableType.COPY_ON_WRITE.name(); + @Parameter(names = {"--hive-sync", "-hv"}, description = "Enable syncing to hive") + private Boolean enableHiveSync = false; + + @Parameter(names = {"--hive-db", "-hd"}, description = "hive database") + private String hiveDB = "default"; + + @Parameter(names = {"--hive-table", "-ht"}, description = "hive table") + private String hiveTable = "hoodie_sample_test"; + + @Parameter(names = {"--hive-user", "-hu"}, description = "hive username") + private String hiveUser = "hive"; + + @Parameter(names = {"--hive-password", "-hp"}, description = "hive password") + private String hivePass = "hive"; + + @Parameter(names = {"--hive-url", "-hl"}, description = "hive JDBC URL") + private String hiveJdbcUrl = "jdbc:hive://localhost:10000"; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -86,11 +105,12 @@ public class HoodieJavaApp { Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); // Save as hoodie dataset (copy on write) - inputDF1.write().format("com.uber.hoodie") // specify the hoodie source + DataFrameWriter writer = inputDF1.write().format("com.uber.hoodie") // specify the hoodie source .option("hoodie.insert.shuffle.parallelism", "2") // any hoodie client config can be passed like this .option("hoodie.upsert.shuffle.parallelism", "2") // full list in HoodieWriteConfig & its package + .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType) // Hoodie Table Type .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) // insert .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), @@ -101,9 +121,11 @@ public class HoodieJavaApp { "timestamp") // use to combine duplicate records in input/with disk val .option(HoodieWriteConfig.TABLE_NAME, tableName) // Used by hive sync and queries .mode( - SaveMode.Overwrite) // This will remove any existing data at path below, and create a - // new dataset if needed - .save(tablePath); // ultimately where the dataset will be placed + SaveMode.Overwrite); // This will remove any existing data at path below, and create a + + updateHiveSyncConfig(writer); + // new dataset if needed + writer.save(tablePath); // ultimately where the dataset will be placed String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); logger.info("First commit at instant time :" + commitInstantTime1); @@ -113,12 +135,15 @@ public class HoodieJavaApp { List records2 = DataSourceTestUtils.convertToStringList( dataGen.generateUpdates("002"/* ignore */, 100)); Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); - inputDF2.write().format("com.uber.hoodie").option("hoodie.insert.shuffle.parallelism", "2") + writer = inputDF2.write().format("com.uber.hoodie").option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType) // Hoodie Table Type .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") - .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append).save(tablePath); + .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append); + updateHiveSyncConfig(writer); + writer.save(tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); logger.info("Second commit at instant time :" + commitInstantTime1); @@ -135,18 +160,39 @@ public class HoodieJavaApp { spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0") .show(); - /** - * Consume incrementally, only changes in commit 2 above. - */ - Dataset hoodieIncViewDF = spark.read().format("com.uber.hoodie") - .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), - DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), - commitInstantTime1) // Only changes in write 2 above - .load( - tablePath); // For incremental view, pass in the root/base path of dataset + if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) { + /** + * Consume incrementally, only changes in commit 2 above. Currently only supported for COPY_ON_WRITE TABLE + */ + Dataset hoodieIncViewDF = spark.read().format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), + DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), + commitInstantTime1) // Only changes in write 2 above + .load( + tablePath); // For incremental view, pass in the root/base path of dataset - logger.info("You will only see records from : " + commitInstantTime2); - hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); + logger.info("You will only see records from : " + commitInstantTime2); + hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); + } + } + + /** + * Setup configs for syncing to hive + * @param writer + * @return + */ + private DataFrameWriter updateHiveSyncConfig(DataFrameWriter writer) { + if (enableHiveSync) { + logger.info("Enabling Hive sync to " + hiveJdbcUrl); + writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable) + .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB) + .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl) + .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr") + .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser) + .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass) + .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true"); + } + return writer; } } diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index e8159d540..de5018989 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -118,6 +118,7 @@ com.uber.hoodie hoodie-common ${project.version} + tests test-jar test @@ -175,31 +176,11 @@ com.uber.hoodie hoodie-client ${project.version} + tests test-jar test - - - org.apache.hive - hive-jdbc - ${hive.version}-cdh${cdh.version} - standalone - - - org.slf4j - slf4j-api - - - javax.servlet - servlet-api - - - com.fasterxml.jackson.* - * - - - - + commons-dbcp commons-dbcp @@ -278,7 +259,7 @@ org.apache.avro avro-mapred - 1.7.6-cdh5.7.2 + 1.7.7 @@ -323,4 +304,58 @@ + + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + standalone + + + org.slf4j + slf4j-api + + + javax.servlet + servlet-api + + + + + + + hive11 + + + hive11 + + + + + org.apache.hive + hive-jdbc + ${hive11.version} + standalone + + + org.slf4j + slf4j-api + + + javax.servlet + servlet-api + + + + + + diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml new file mode 100644 index 000000000..c299cef70 --- /dev/null +++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml @@ -0,0 +1,299 @@ + + + + + + hoodie + com.uber.hoodie + 0.4.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + hoodie-hadoop-mr-bundle + + + + com.uber.hoodie + hoodie-common + ${project.version} + + + + com.uber.hoodie + hoodie-hadoop-mr + ${project.version} + + + + com.uber.hoodie + * + + + + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + + org.apache.hadoop + hadoop-auth + + + + org.apache.hadoop + hadoop-hdfs + + + + commons-logging + commons-logging + + + + org.apache.commons + commons-lang3 + + + + org.apache.parquet + parquet-avro + + + + com.twitter + parquet-avro + + + + com.twitter + parquet-hadoop-bundle + + + + com.twitter.common + objectsize + 0.0.12 + + + + org.apache.avro + avro + + + + com.esotericsoftware + kryo + test + + + + junit + junit + test + + + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-shade-plugin + 2.4 + + + package + + shade + + + ${project.build.directory}/dependency-reduced-pom.xml + + + + parquet.avro + com.uber.hoodie.parquet.avro + + + parquet.column + com.uber.hoodie.parquet.column + + + parquet.format. + com.uber.hoodie.parquet.format. + + + parquet.hadoop. + com.uber.hoodie.parquet.hadoop. + + + parquet.schema + com.uber.hoodie.parquet.schema + + + org.apache.commons + com.uber.hoodie.org.apache.commons + + + false + + + com.uber.hoodie:hoodie-common + com.uber.hoodie:hoodie-hadoop-mr + com.twitter:parquet-avro + com.twitter:parquet-hadoop-bundle + com.twitter.common:objectsize + commons-logging:commons-logging + org.apache.commons:commons-lang3 + + + ${project.artifactId}-${project.version}${hiveJarSuffix} + ${hiveJarSuffix} + + + + + + + + + hive12 + + + !hive11 + + + + + + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + commons-logging + commons-logging + + + + + ${hive12.groupid} + hive-exec + ${hive12.version} + + + ${hive12.groupid} + hive-service + ${hive12.version} + + + ${hive12.groupid} + hive-shims + ${hive12.version} + + + ${hive12.groupid} + hive-serde + ${hive12.version} + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + + + ${hive12.groupid} + hive-common + ${hive12.version} + + + + + hive11 + + + hive11 + + + + .hive11 + + + + ${hive11.groupid} + hive-service + ${hive11.version} + + + ${hive11.groupid} + hive-shims + ${hive11.version} + + + ${hive11.groupid} + hive-jdbc + ${hive11.version} + + + commons-logging + commons-logging + + + + + ${hive11.groupid} + hive-serde + ${hive11.version} + + + ${hive11.groupid} + hive-metastore + ${hive11.version} + + + ${hive11.groupid} + hive-common + ${hive11.version} + + + ${hive11.groupid} + hive-exec + ${hive11.version} + + + + + + diff --git a/packaging/hoodie-hive-bundle/pom.xml b/packaging/hoodie-hive-bundle/pom.xml new file mode 100644 index 000000000..a0d43c9cc --- /dev/null +++ b/packaging/hoodie-hive-bundle/pom.xml @@ -0,0 +1,263 @@ + + + + + + hoodie + com.uber.hoodie + 0.4.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + hoodie-hive-bundle + jar + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-auth + + + com.google.guava + guava + + + org.apache.thrift + libthrift + 0.9.2 + + + + joda-time + joda-time + + + + + commons-dbcp + commons-dbcp + + + + commons-io + commons-io + + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + + com.beust + jcommander + + + + org.apache.httpcomponents + httpcore + + + + org.apache.httpcomponents + httpclient + + + + com.twitter + parquet-avro + + + + com.uber.hoodie + hoodie-hadoop-mr-bundle + ${project.version} + + + + com.uber.hoodie + hoodie-hive + ${project.version} + + + + com.uber.hoodie + * + + + + + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-shade-plugin + 2.4 + + + package + + shade + + + + + com.beust. + com.uber.hoodie.com.beust. + + + org.joda. + com.uber.hoodie.org.joda. + + + com.google. + com.uber.hoodie.com.google. + + + org.slf4j. + com.uber.hoodie.org.slf4j. + + + org.apache.commons. + com.uber.hoodie.org.apache.commons. + + + parquet.column + com.uber.hoodie.parquet.column + + + parquet.format. + com.uber.hoodie.parquet.format. + + + parquet.hadoop. + com.uber.hoodie.parquet.hadoop. + + + parquet.schema. + com.uber.hoodie.parquet.schema. + + + false + + + log4j:log4j + org.apache.hadoop:* + org.apache.hive:* + org.apache.derby:derby + + + ${project.artifactId}-${project.version}${hiveJarSuffix} + + + + + + + + + + hive12 + + + !hive11 + + + + + + + + ${hive12.groupid} + hive-service + ${hive12.version} + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + + + ${hive12.groupid} + hive-common + ${hive12.version} + + + + + hive11 + + + hive11 + + + + .hive11 + + + + org.apache.hive + hive-service + ${hive11.version} + + + org.apache.hive + hive-jdbc + ${hive11.version} + + + org.apache.hive + hive-metastore + ${hive11.version} + + + org.apache.hive + hive-common + ${hive11.version} + + + + + diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml new file mode 100644 index 000000000..624b69bb9 --- /dev/null +++ b/packaging/hoodie-spark-bundle/pom.xml @@ -0,0 +1,345 @@ + + + + + + hoodie + com.uber.hoodie + 0.4.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + com.uber.hoodie + hoodie-spark-bundle + jar + + + 1.2.17 + 4.10 + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-shade-plugin + 2.4 + + + package + + shade + + + + + com.beust. + com.uber.hoodie.com.beust. + + + org.joda. + com.uber.hoodie.org.joda. + + + com.google. + com.uber.hoodie.com.google. + + + org.slf4j. + com.uber.hoodie.org.slf4j. + + + org.apache. + com.uber.hoodie.org.apache. + + com.databricks.spark.** + org.apache.avro.** + org.apache.derby.** + org.apache.hadoop.** + org.apache.hive.** + org.apache.logging.log4j.** + org.apache.log4j.** + org.apache.spark.** + org.apache.thrift.** + + + + parquet.column + com.uber.hoodie.parquet.column + + + parquet.format. + com.uber.hoodie.parquet.format. + + + parquet.hadoop. + com.uber.hoodie.parquet.hadoop. + + + parquet.schema + com.uber.hoodie.parquet.schema + + + org.apache.hive.jdbc. + com.uber.hoodie.org.apache.hive.jdbc. + + + org.apache.hadoop.hive.metastore. + com.uber.hoodie.org.apache.hadoop_hive.metastore. + + + org.apache.hive.common. + com.uber.hoodie.org.apache.hive.common. + + + org.apache.hadoop.hive.common. + com.uber.hoodie.org.apache.hadoop_hive.common. + + + org.apache.hadoop.hive.conf. + com.uber.hoodie.org.apache.hadoop_hive.conf. + + + org.apache.hive.service. + com.uber.hoodie.org.apache.hive.service. + + + org.apache.hadoop.hive.service. + com.uber.hoodie.org.apache.hadoop_hive.service. + + + false + + + com.databricks:spark-avro_2.11 + log4j:* + org.apache.avro:* + org.apache.derby:derby + org.apache.hadoop:* + org.apache.hbase:* + + org.apache.hive:hive-exec + org.apache.hive:hive-serde + org.apache.hive:hive-shims + org.apache.spark:* + + + ${project.artifactId}-${project.version}${hiveJarSuffix} + + + + + + + + + + com.beust + jcommander + + + commons-dbcp + commons-dbcp + + + org.apache.avro + avro + + + org.scala-lang + scala-library + ${scala.version} + + + org.scalatest + scalatest_2.11 + 3.0.1 + test + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + com.databricks + spark-avro_2.11 + 4.0.0 + + + com.fasterxml.jackson.core + jackson-annotations + + + org.apache.hadoop + hadoop-client + + + javax.servlet + * + + + provided + + + org.apache.hadoop + hadoop-common + provided + + + log4j + log4j + ${log4j.version} + + + org.apache.avro + avro + + + org.apache.commons + commons-lang3 + + + org.apache.commons + commons-configuration2 + + + com.uber.hoodie + hoodie-common + ${project.version} + + + com.uber.hoodie + hoodie-hadoop-mr + ${project.version} + + + com.uber.hoodie + hoodie-hive + ${project.version} + + + com.uber.hoodie + hoodie-client + ${project.version} + + + com.uber.hoodie + hoodie-spark + ${project.version} + + + + + + hive12 + + + !hive11 + + + + + + + + ${hive12.groupid} + hive-service + ${hive12.version} + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + + + ${hive12.groupid} + hive-common + ${hive12.version} + + + + + hive11 + + + hive11 + + + + .hive11 + + + + ${hive11.groupid} + hive-service + ${hive11.version} + + + ${hive11.groupid} + hive-jdbc + ${hive11.version} + + + ${hive11.groupid} + hive-metastore + ${hive11.version} + + + ${hive11.groupid} + hive-common + ${hive11.version} + + + + + + diff --git a/pom.xml b/pom.xml index d321ddb15..d77f8f23c 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ Hoodie is a Apache Spark library that provides the ability to efficiently do incremental processing on datasets in HDFS - https://github.com/uber/hoodie + https://github.com/uber/hudi Hoodie @@ -36,6 +36,9 @@ hoodie-hive hoodie-utilities hoodie-spark + packaging/hoodie-hadoop-mr-bundle + packaging/hoodie-hive-bundle + packaging/hoodie-spark-bundle @@ -61,7 +64,7 @@ prasanna Prasanna Rajaperumal - Uber + Snowflake @@ -94,23 +97,14 @@ Nishith Agarwal Uber + + Balaji Varadharajan + Uber + 2015-2016 - - - com.google.code.gson - gson - 2.3.1 - test - - - junit - junit - ${junit.version} - test - - + 2.10 @@ -121,11 +115,15 @@ 4.11 1.9.5 1.2.17 - 5.7.2 - 2.6.0 - 1.1.0 + 2.9.9 + 2.7.3 + org.apache.hive + 1.2.1 + org.apache.hive + 1.1.1 3.1.1 2.1.0 + 1.7.7 2.11.8 2.11 @@ -278,32 +276,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - org.apache.rat @@ -337,7 +309,7 @@ org.apache.avro avro-maven-plugin - 1.7.6 + ${avro.version} generate-sources @@ -359,6 +331,19 @@ + + com.google.code.gson + gson + 2.3.1 + test + + + + junit + junit + ${junit.version} + test + com.beust @@ -372,10 +357,17 @@ ${log4j.version} + + + joda-time + joda-time + ${joda.version} + + org.apache.hadoop hadoop-client - ${hadoop.version}-cdh${cdh.version} + ${hadoop.version} provided @@ -404,7 +396,7 @@ org.apache.avro avro-mapred - 1.7.7 + ${avro.version} @@ -418,25 +410,19 @@ org.apache.hadoop hadoop-common - ${hadoop.version}-cdh${cdh.version} + ${hadoop.version} provided org.apache.hadoop hadoop-hdfs - ${hadoop.version}-cdh${cdh.version} + ${hadoop.version} provided org.apache.hadoop hadoop-auth - ${hadoop.version}-cdh${cdh.version} - provided - - - org.apache.hive - hive-common - ${hive.version}-cdh${cdh.version} + ${hadoop.version} provided @@ -448,19 +434,13 @@ org.apache.hadoop hadoop-mapreduce-client-core - ${hadoop.version}-cdh${cdh.version} + ${hadoop.version} provided org.apache.hadoop hadoop-mapreduce-client-common - 2.6.0-cdh5.7.2 - provided - - - org.apache.hive - hive-exec - 1.1.0-cdh5.7.2 + ${hadoop.version} provided @@ -468,30 +448,34 @@ commons-logging 1.2 - + + commons-io + commons-io + 2.6 + com.twitter parquet-hadoop-bundle - 1.5.0-cdh5.7.2 + 1.6.0 com.twitter parquet-hive-bundle - 1.5.0 + 1.6.0 com.twitter parquet-avro - 1.5.0-cdh5.7.2 + 1.6.0 org.apache.parquet parquet-hive-bundle - 1.8.1 + ${parquet.version} @@ -532,7 +516,7 @@ org.apache.avro avro - 1.7.6-cdh5.7.2 + ${avro.version} org.slf4j @@ -574,6 +558,11 @@ httpcore 4.3.2 + + org.apache.httpcomponents + httpclient + 4.3.2 + org.slf4j slf4j-api @@ -621,35 +610,17 @@ 1.9.13 + + com.fasterxml.jackson.core + jackson-databind + 2.6.0 + org.codehaus.jackson jackson-mapper-asl 1.9.13 - - org.apache.hive - hive-jdbc - ${hive.version}-cdh${cdh.version} - - - com.fasterxml.jackson.* - * - - - - - - org.apache.hive - hive-service - ${hive.version}-cdh${cdh.version} - - - org.apache.hive - hive-metastore - ${hive.version}-cdh${cdh.version} - - junit junit @@ -659,33 +630,13 @@ org.apache.hadoop hadoop-hdfs tests - ${hadoop.version}-cdh${cdh.version} - - - org.codehaus - * - - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - test + ${hadoop.version} org.apache.hadoop hadoop-common tests - ${hadoop.version}-cdh${cdh.version} + ${hadoop.version} org.mockito @@ -703,8 +654,11 @@ - + + Maven repository + https://central.maven.org/maven2/ + cloudera-repo-releases https://repository.cloudera.com/artifactory/public/ @@ -723,6 +677,109 @@ + + hive12 + + + !hive11 + + + + + ${hive12.groupid} + hive-service + ${hive12.version} + provided + + + ${hive12.groupid} + hive-shims + ${hive12.version} + provided + + + ${hive12.groupid} + hive-jdbc + ${hive12.version} + provided + + + ${hive12.groupid} + hive-serde + ${hive12.version} + provided + + + ${hive12.groupid} + hive-metastore + ${hive12.version} + provided + + + ${hive12.groupid} + hive-common + ${hive12.version} + provided + + + ${hive12.groupid} + hive-exec + ${hive12.version} + provided + + + + + hive11 + + + hive11 + + + + + org.apache.hive + hive-service + ${hive11.version} + + + org.apache.hive + hive-shims + ${hive11.version} + provided + + + org.apache.hive + hive-jdbc + ${hive11.version} + provided + + + org.apache.hive + hive-serde + ${hive11.version} + provided + + + org.apache.hive + hive-metastore + ${hive11.version} + provided + + + org.apache.hive + hive-common + ${hive11.version} + provided + + + org.apache.hive + hive-exec + ${hive11.version} + provided + + + release