From 388457b6b28ccfb6b91c0f1b35e921ffc913e8e3 Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Mon, 19 Dec 2016 23:04:39 -0800 Subject: [PATCH] Add hoodie-hive module to support hive registration of hoodie datasets --- hoodie-hadoop-mr/pom.xml | 26 ++ .../uber/hoodie/hadoop/HoodieHiveUtil.java | 2 +- .../uber/hoodie/hadoop/HoodieInputFormat.java | 2 +- .../hoodie/hadoop/HoodieInputFormatTest.java | 2 +- .../hoodie/hadoop/InputFormatTestUtil.java | 2 +- hoodie-hive/pom.xml | 127 +++++ .../hoodie/hive/HoodieHiveConfiguration.java | 119 +++++ .../hive/HoodieHiveDatasetException.java | 40 ++ .../hive/HoodieHiveDatasetSyncTask.java | 181 ++++++++ .../hoodie/hive/HoodieHiveSchemaSyncTask.java | 243 ++++++++++ .../uber/hoodie/hive/PartitionStrategy.java | 56 +++ .../com/uber/hoodie/hive/SchemaStrategy.java | 31 ++ .../hoodie/hive/client/ColumnNameXLator.java | 48 ++ .../hoodie/hive/client/HoodieFSClient.java | 206 +++++++++ .../hoodie/hive/client/HoodieHiveClient.java | 365 +++++++++++++++ .../uber/hoodie/hive/client/SchemaUtil.java | 436 ++++++++++++++++++ .../hive/example/HoodieDatasetExample.java | 56 +++ .../hive/impl/DayBasedPartitionStrategy.java | 78 ++++ .../impl/ParseSchemaFromDataStrategy.java | 43 ++ .../hive/model/HoodieDatasetReference.java | 79 ++++ .../hoodie/hive/model/SchemaDifference.java | 108 +++++ .../hoodie/hive/model/StoragePartition.java | 51 ++ .../hoodie/hive/model/TablePartition.java | 38 ++ .../uber/hoodie/hive/DatasetSchemaTest.java | 186 ++++++++ .../uber/hoodie/hive/HDroneDatasetTest.java | 98 ++++ .../hoodie/hive/util/CsvParquetWriter.java | 44 ++ .../hoodie/hive/util/CsvWriteSupport.java | 94 ++++ .../hoodie/hive/util/HdfsTestService.java | 166 +++++++ .../hoodie/hive/util/HiveTestService.java | 322 +++++++++++++ .../com/uber/hoodie/hive/util/TestUtil.java | 199 ++++++++ .../hive/util/ZookeeperTestService.java | 241 ++++++++++ hoodie-hive/src/test/resources/nation.csv | 25 + hoodie-hive/src/test/resources/nation.schema | 6 + .../src/test/resources/nation_evolved.csv | 25 + .../src/test/resources/nation_evolved.schema | 7 + pom.xml | 46 +- 36 files changed, 3793 insertions(+), 5 deletions(-) create mode 100644 hoodie-hive/pom.xml create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java create mode 100644 hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java create mode 100644 hoodie-hive/src/test/resources/nation.csv create mode 100644 hoodie-hive/src/test/resources/nation.schema create mode 100644 hoodie-hive/src/test/resources/nation_evolved.csv create mode 100644 hoodie-hive/src/test/resources/nation_evolved.schema diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml index 59137bf04..d0bce106e 100644 --- a/hoodie-hadoop-mr/pom.xml +++ b/hoodie-hadoop-mr/pom.xml @@ -1,4 +1,20 @@ + + @@ -66,4 +82,14 @@ avro + + + + + org.apache.rat + apache-rat-plugin + + + + diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java index ee38cb2d7..b1666d4c6 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 5e81c6231..29a3dd9fa 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java index 3ca847f6a..997c91f22 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index d2e5e3edb..6a016c4a6 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml new file mode 100644 index 000000000..153eeb1e1 --- /dev/null +++ b/hoodie-hive/pom.xml @@ -0,0 +1,127 @@ + + + + + + hoodie + com.uber.hoodie + 0.2.5-SNAPSHOT + + 4.0.0 + + hoodie-hive + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-auth + + + org.apache.hive + hive-common + + + org.apache.hive + hive-jdbc + + + com.google.guava + guava + + + org.apache.hive + hive-service + + + org.apache.hive + hive-metastore + + + org.apache.thrift + libthrift + 0.9.2 + + + + commons-dbcp + commons-dbcp + + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + + + junit + junit + + + org.apache.hadoop + hadoop-hdfs + tests + + + org.apache.hadoop + hadoop-common + tests + + + org.mockito + mockito-all + test + + + + com.uber.hoodie + hoodie-hadoop-mr + ${project.version} + + + com.uber.hoodie + hoodie-common + ${project.version} + + + + + + + + org.apache.rat + apache-rat-plugin + + + + + diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java new file mode 100644 index 000000000..34c49d31a --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Configurations for registering a hoodie dataset into hive metastore + */ +public class HoodieHiveConfiguration { + private final String hiveJdbcUrl; + private final String dbName; + private final String hiveUsername; + private final String hivePassword; + private final Configuration configuration; + + private HoodieHiveConfiguration(String hiveJdbcUrl, String defaultDatabaseName, + String hiveUsername, String hivePassword, Configuration configuration) { + this.hiveJdbcUrl = hiveJdbcUrl; + this.dbName = defaultDatabaseName; + this.hiveUsername = hiveUsername; + this.hivePassword = hivePassword; + this.configuration = configuration; + } + + public String getHiveJdbcUrl() { + return hiveJdbcUrl; + } + + public String getDbName() { + return dbName; + } + + public String getHiveUsername() { + return hiveUsername; + } + + public String getHivePassword() { + return hivePassword; + } + + public Configuration getConfiguration() { + return configuration; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("HoodieHiveConfiguration{"); + sb.append("hiveJdbcUrl='").append(hiveJdbcUrl).append('\''); + sb.append(", dbName='").append(dbName).append('\''); + sb.append(", hiveUsername='").append(hiveUsername).append('\''); + sb.append(", hivePassword='").append(hivePassword).append('\''); + sb.append(", configuration=").append(configuration); + sb.append('}'); + return sb.toString(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private static Logger LOG = LoggerFactory.getLogger(Builder.class); + private String hiveJdbcUrl; + private String dbName; + private String jdbcUsername; + private String jdbcPassword; + private Configuration configuration; + + public Builder hiveJdbcUrl(String hiveJdbcUrl) { + this.hiveJdbcUrl = hiveJdbcUrl; + return this; + } + + public Builder hivedb(String hiveDatabase) { + this.dbName = hiveDatabase; + return this; + } + + public Builder jdbcUsername(String jdbcUsername) { + this.jdbcUsername = jdbcUsername; + return this; + } + + public Builder jdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + return this; + } + + public Builder hadoopConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public HoodieHiveConfiguration build() { + HoodieHiveConfiguration config = + new HoodieHiveConfiguration(hiveJdbcUrl, dbName, jdbcUsername, jdbcPassword, + configuration); + LOG.info("Hoodie Hive Configuration - " + config); + return config; + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java new file mode 100644 index 000000000..4dc06e645 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +public class HoodieHiveDatasetException extends RuntimeException { + + public HoodieHiveDatasetException() { + super(); + } + + public HoodieHiveDatasetException(String message) { + super(message); + } + + public HoodieHiveDatasetException(String message, Throwable t) { + super(message, t); + } + + public HoodieHiveDatasetException(Throwable t) { + super(t); + } + + protected static String format(String message, Object... args) { + return String.format(String.valueOf(message), (Object[]) args); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java new file mode 100644 index 000000000..184917a1c --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.client.HoodieHiveClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.StoragePartition; +import com.uber.hoodie.hive.model.TablePartition; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.fs.FileStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Represents a Hive External Dataset. + * Contains metadata for storage and table partitions. + */ +public class HoodieHiveDatasetSyncTask { + private static Logger LOG = LoggerFactory.getLogger(HoodieHiveDatasetSyncTask.class); + private final HoodieHiveSchemaSyncTask schemaSyncTask; + private final List newPartitions; + private final List changedPartitions; + + public HoodieHiveDatasetSyncTask(HoodieHiveSchemaSyncTask schemaSyncTask, + List newPartitions, List changedPartitions) { + this.schemaSyncTask = schemaSyncTask; + this.newPartitions = ImmutableList.copyOf(newPartitions); + this.changedPartitions = ImmutableList.copyOf(changedPartitions); + } + + public HoodieHiveSchemaSyncTask getSchemaSyncTask() { + return schemaSyncTask; + } + + public List getNewPartitions() { + return newPartitions; + } + + public List getChangedPartitions() { + return changedPartitions; + } + + /** + * Sync this dataset + * 1. If any schema difference is found, then sync the table schema + * 2. If any new partitions are found, adds partitions to the table (which uses the table schema by default) + * 3. If any partition path has changed, modify the partition to the new path (which does not change the partition schema) + */ + public void sync() { + LOG.info("Starting Sync for " + schemaSyncTask.getReference()); + try { + // First sync the table schema + schemaSyncTask.sync(); + + // Add all the new partitions + schemaSyncTask.getHiveClient() + .addPartitionsToTable(schemaSyncTask.getReference(), newPartitions, + schemaSyncTask.getPartitionStrategy()); + // Update all the changed partitions + schemaSyncTask.getHiveClient() + .updatePartitionsToTable(schemaSyncTask.getReference(), changedPartitions, + schemaSyncTask.getPartitionStrategy()); + } catch (Exception e) { + throw new HoodieHiveDatasetException( + "Failed to sync dataset " + schemaSyncTask.getReference(), e); + } + LOG.info("Sync for " + schemaSyncTask.getReference() + " complete."); + } + + public static Builder newBuilder(HoodieHiveDatasetSyncTask dataset) { + return newBuilder().withConfiguration(dataset.schemaSyncTask.getConf()) + .withReference(dataset.schemaSyncTask.getReference()) + .withFSClient(dataset.schemaSyncTask.getFsClient()) + .withHiveClient(dataset.schemaSyncTask.getHiveClient()) + .schemaStrategy(dataset.schemaSyncTask.getSchemaStrategy()) + .partitionStrategy(dataset.schemaSyncTask.getPartitionStrategy()); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private static Logger LOG = LoggerFactory.getLogger(Builder.class); + private HoodieHiveConfiguration configuration; + private HoodieDatasetReference datasetReference; + private SchemaStrategy schemaStrategy; + private PartitionStrategy partitionStrategy; + private HoodieHiveClient hiveClient; + private HoodieFSClient fsClient; + + public Builder withReference(HoodieDatasetReference reference) { + this.datasetReference = reference; + return this; + } + + public Builder withConfiguration(HoodieHiveConfiguration configuration) { + this.configuration = configuration; + return this; + } + + public Builder schemaStrategy(SchemaStrategy schemaStrategy) { + this.schemaStrategy = schemaStrategy; + return this; + } + + public Builder partitionStrategy(PartitionStrategy partitionStrategy) { + if(partitionStrategy != null) { + LOG.info("Partitioning the dataset with keys " + ArrayUtils + .toString(partitionStrategy.getHivePartitionFieldNames())); + } + this.partitionStrategy = partitionStrategy; + return this; + } + + public Builder withHiveClient(HoodieHiveClient hiveClient) { + this.hiveClient = hiveClient; + return this; + } + + public Builder withFSClient(HoodieFSClient fsClient) { + this.fsClient = fsClient; + return this; + } + + public HoodieHiveDatasetSyncTask build() { + LOG.info("Building dataset for " + datasetReference); + HoodieHiveSchemaSyncTask schemaSyncTask = + HoodieHiveSchemaSyncTask.newBuilder().withReference(datasetReference) + .withConfiguration(configuration).schemaStrategy(schemaStrategy) + .partitionStrategy(partitionStrategy).withHiveClient(hiveClient) + .withFSClient(fsClient).build(); + + List storagePartitions = Lists.newArrayList(); + FileStatus[] storagePartitionPaths = schemaSyncTask.getPartitionStrategy() + .scanAllPartitions(schemaSyncTask.getReference(), schemaSyncTask.getFsClient()); + for (FileStatus fileStatus : storagePartitionPaths) { + storagePartitions.add(new StoragePartition(schemaSyncTask.getReference(), + schemaSyncTask.getPartitionStrategy(), fileStatus)); + } + LOG.info("Storage partitions scan complete. Found " + storagePartitions.size()); + + List newPartitions; + List changedPartitions; + + // Check if table exists + if (schemaSyncTask.getHiveClient().checkTableExists(schemaSyncTask.getReference())) { + List partitions = + schemaSyncTask.getHiveClient().scanPartitions(schemaSyncTask.getReference()); + LOG.info("Table partition scan complete. Found " + partitions.size()); + newPartitions = schemaSyncTask.getFsClient() + .getUnregisteredStoragePartitions(partitions, storagePartitions); + changedPartitions = schemaSyncTask.getFsClient() + .getChangedStoragePartitions(partitions, storagePartitions); + } else { + newPartitions = storagePartitions; + changedPartitions = Lists.newArrayList(); + } + return new HoodieHiveDatasetSyncTask(schemaSyncTask, newPartitions, changedPartitions); + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java new file mode 100644 index 000000000..ffabcd3ea --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + + +import com.google.common.base.Objects; +import com.google.common.collect.Maps; +import com.uber.hoodie.hadoop.HoodieInputFormat; +import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy; +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.client.HoodieHiveClient; +import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy; +import com.uber.hoodie.hive.client.SchemaUtil; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.SchemaDifference; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.schema.MessageType; + +import java.util.Map; + +/** + * Represents the Schema sync task for the dataset. + * Execute sync() on this task to sync up the HDFS dataset schema and hive table schema + */ +public class HoodieHiveSchemaSyncTask { + private static Logger LOG = LoggerFactory.getLogger(HoodieHiveSchemaSyncTask.class); + + private static final String DEFAULT_INPUTFORMAT = HoodieInputFormat.class.getName(); + private static final String DEFAULT_OUTPUTFORMAT = MapredParquetOutputFormat.class.getName(); + + private final HoodieDatasetReference reference; + private final MessageType storageSchema; + private final Map tableSchema; + private final PartitionStrategy partitionStrategy; + private final SchemaStrategy schemaStrategy; + private final HoodieHiveClient hiveClient; + private final HoodieHiveConfiguration conf; + private final HoodieFSClient fsClient; + + public HoodieHiveSchemaSyncTask(HoodieDatasetReference datasetReference, + MessageType schemaInferred, Map fieldsSchema, + PartitionStrategy partitionStrategy, SchemaStrategy schemaStrategy, + HoodieHiveConfiguration configuration, HoodieHiveClient hiveClient, + HoodieFSClient fsClient) { + this.reference = datasetReference; + this.storageSchema = schemaInferred; + this.tableSchema = fieldsSchema; + this.partitionStrategy = partitionStrategy; + this.schemaStrategy = schemaStrategy; + this.hiveClient = hiveClient; + this.conf = configuration; + this.fsClient = fsClient; + } + + public SchemaDifference getSchemaDifference() { + return SchemaUtil.getSchemaDifference(storageSchema, tableSchema, + partitionStrategy.getHivePartitionFieldNames()); + } + + /** + * Checks if the table schema is present. If not, creates one. + * If already exists, computes the schema difference and if there is any difference + * it generates a alter table and syncs up the schema to hive metastore. + */ + public void sync() { + try { + // Check if the table needs to be created + if (tableSchema.isEmpty()) { + // create the database + LOG.info("Schema not found. Creating for " + reference); + hiveClient.createTable(storageSchema, reference, + partitionStrategy.getHivePartitionFieldNames(), DEFAULT_INPUTFORMAT, + DEFAULT_OUTPUTFORMAT); + } else { + if (!getSchemaDifference().isEmpty()) { + LOG.info("Schema sync required for " + reference); + hiveClient.updateTableDefinition(reference, + partitionStrategy.getHivePartitionFieldNames(), storageSchema); + } else { + LOG.info("Schema sync not required for " + reference); + } + } + } catch (Exception e) { + throw new HoodieHiveDatasetException("Failed to sync dataset " + reference, + e); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public MessageType getStorageSchema() { + return storageSchema; + } + + public Map getTableSchema() { + return tableSchema; + } + + public PartitionStrategy getPartitionStrategy() { + return partitionStrategy; + } + + public SchemaStrategy getSchemaStrategy() { + return schemaStrategy; + } + + public HoodieHiveClient getHiveClient() { + return hiveClient; + } + + public HoodieHiveConfiguration getConf() { + return conf; + } + + public HoodieDatasetReference getReference() { + return reference; + } + + public HoodieFSClient getFsClient() { + return fsClient; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + HoodieHiveSchemaSyncTask that = (HoodieHiveSchemaSyncTask) o; + return Objects.equal(storageSchema, that.storageSchema) && Objects + .equal(tableSchema, that.tableSchema); + } + + @Override + public int hashCode() { + return Objects.hashCode(storageSchema, tableSchema); + } + + public static class Builder { + private static Logger LOG = LoggerFactory.getLogger(Builder.class); + private HoodieHiveConfiguration configuration; + private HoodieDatasetReference datasetReference; + private SchemaStrategy schemaStrategy; + private PartitionStrategy partitionStrategy; + private HoodieHiveClient hiveClient; + private HoodieFSClient fsClient; + + public Builder withReference(HoodieDatasetReference reference) { + this.datasetReference = reference; + return this; + } + + public Builder withConfiguration(HoodieHiveConfiguration configuration) { + this.configuration = configuration; + return this; + } + + public Builder schemaStrategy(SchemaStrategy schemaStrategy) { + this.schemaStrategy = schemaStrategy; + return this; + } + + public Builder partitionStrategy(PartitionStrategy partitionStrategy) { + if(partitionStrategy != null) { + LOG.info("Partitioning the dataset with keys " + ArrayUtils + .toString(partitionStrategy.getHivePartitionFieldNames())); + } + this.partitionStrategy = partitionStrategy; + return this; + } + + public Builder withHiveClient(HoodieHiveClient hiveClient) { + this.hiveClient = hiveClient; + return this; + } + + public Builder withFSClient(HoodieFSClient fsClient) { + this.fsClient = fsClient; + return this; + } + + public HoodieHiveSchemaSyncTask build() { + LOG.info("Building dataset schema for " + datasetReference); + createDefaults(); + + MessageType schemaInferred = + schemaStrategy.getDatasetSchema(datasetReference, fsClient); + LOG.info("Storage Schema inferred for dataset " + datasetReference); + LOG.debug("Inferred Storage Schema " + schemaInferred); + + Map fieldsSchema; + if (!hiveClient.checkTableExists(datasetReference)) { + fieldsSchema = Maps.newHashMap(); + } else { + fieldsSchema = hiveClient.getTableSchema(datasetReference); + } + LOG.info("Table Schema inferred for dataset " + datasetReference); + LOG.debug("Inferred Table Schema " + fieldsSchema); + + return new HoodieHiveSchemaSyncTask(datasetReference, schemaInferred, fieldsSchema, + partitionStrategy, schemaStrategy, configuration, hiveClient, fsClient); + } + + private void createDefaults() { + if (partitionStrategy == null) { + LOG.info("Partition strategy is not set. Selecting the default strategy"); + partitionStrategy = new DayBasedPartitionStrategy(); + } + if (schemaStrategy == null) { + LOG.info( + "Schema strategy not specified. Selecting the default based on the dataset type"); + schemaStrategy = new ParseSchemaFromDataStrategy(); + } + if (fsClient == null) { + LOG.info("Creating a new FS Client as none has been passed in"); + fsClient = new HoodieFSClient(configuration); + } + if (hiveClient == null) { + LOG.info("Creating a new Hive Client as none has been passed in"); + hiveClient = new HoodieHiveClient(configuration); + } + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java new file mode 100644 index 000000000..e48023b34 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +/** + * Abstraction to define HDFS partition strategies. + * Strategy provides hookups to map partitions on to physical layout + * + * @see SchemaStrategy + */ +public interface PartitionStrategy { + /** + * Scans the file system for all partitions and returns FileStatus[] which are the available partitions + * + * @param basePath + * @param fsClient + * @return + */ + FileStatus[] scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient); + + /** + * Get the list of hive field names the dataset will be partitioned on. + * The field name should be present in the storage schema. + * + * @return List of partitions field names + */ + String[] getHivePartitionFieldNames(); + + /** + * Convert a Partition path (returned in scanAllPartitions) to values for column names returned in getHivePartitionFieldNames + * e.g. /data/topic/2016/12/12/ will return [2016, 12, 12] + * + * @param partition storage path + * @return List of partitions field values + */ + String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition); +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java new file mode 100644 index 000000000..c6d7a38ff --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import parquet.schema.MessageType; + +/** + * Abstraction to get the Parquet schema for a {@link HoodieDatasetReference} + * If you are managing the schemas externally, connect to the system and get the schema. + * + * @see PartitionStrategy + */ +public interface SchemaStrategy { + MessageType getDatasetSchema(HoodieDatasetReference metadata, HoodieFSClient fsClient); +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java new file mode 100644 index 000000000..977133963 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.client; + +import com.google.common.collect.Maps; + +import java.util.Iterator; +import java.util.Map; + +public class ColumnNameXLator { + private static Map xformMap = Maps.newHashMap(); + + public static String translateNestedColumn(String colName) { + Map.Entry entry; + for (Iterator i$ = xformMap.entrySet().iterator(); i$.hasNext(); + colName = colName.replaceAll((String) entry.getKey(), (String) entry.getValue())) { + entry = (Map.Entry) i$.next(); + } + + return colName; + } + + public static String translateColumn(String colName) { + return colName; + } + + public static String translate(String colName, boolean nestedColumn) { + return !nestedColumn ? translateColumn(colName) : translateNestedColumn(colName); + } + + static { + xformMap.put("\\$", "_dollar_"); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java new file mode 100644 index 000000000..0436d5285 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.client; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.uber.hoodie.hive.HoodieHiveConfiguration; +import com.uber.hoodie.hive.HoodieHiveDatasetException; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.StoragePartition; +import com.uber.hoodie.hive.model.TablePartition; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Client to access HDFS + */ +public class HoodieFSClient { + final public static String PARQUET_EXTENSION = ".parquet"; + final public static String PARQUET_EXTENSION_ZIPPED = ".parquet.gz"; + private final static Logger LOG = LoggerFactory.getLogger(HoodieFSClient.class); + private final HoodieHiveConfiguration conf; + private final FileSystem fs; + + public HoodieFSClient(HoodieHiveConfiguration configuration) { + this.conf = configuration; + try { + this.fs = FileSystem.get(configuration.getConfiguration()); + } catch (IOException e) { + throw new HoodieHiveDatasetException( + "Could not initialize file system from configuration", e); + } + } + + /** + * Read the parquet schema from a parquet File + * + * @param parquetFilePath + * @return + * @throws IOException + */ + public MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException { + LOG.info("Reading schema from " + parquetFilePath); + + if (!fs.exists(parquetFilePath)) { + throw new IllegalArgumentException( + "Failed to read schema from data file " + parquetFilePath + + ". File does not exist."); + } + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(conf.getConfiguration(), parquetFilePath); + return fileFooter.getFileMetaData().getSchema(); + } + + /** + * Find the last data file under the partition path. + * + * @param metadata + * @param partitionPathString + * @return + */ + public Path lastDataFileForDataset(HoodieDatasetReference metadata, + String partitionPathString) { + try { + Path partitionPath = new Path(partitionPathString); + if (!fs.exists(partitionPath)) { + throw new HoodieHiveDatasetException( + "Partition path " + partitionPath + " not found in Dataset " + metadata); + } + + RemoteIterator files = fs.listFiles(partitionPath, true); + // Iterate over the list. List is generally is listed in chronological order becasue of the date partitions + // Get the latest schema + Path returnPath = null; + while (files.hasNext()) { + Path path = files.next().getPath(); + if (path.getName().endsWith(PARQUET_EXTENSION) || path.getName() + .endsWith(PARQUET_EXTENSION_ZIPPED)) { + returnPath = path; + } + } + if (returnPath != null) { + return returnPath; + } + throw new HoodieHiveDatasetException( + "No data file found in path " + partitionPath + " for dataset " + metadata); + } catch (IOException e) { + throw new HoodieHiveDatasetException( + "Failed to get data file in path " + partitionPathString + " for dataset " + + metadata, e); + } + } + + /** + * Finds all the files/directories that match the pattern under the {@link HoodieDatasetReference} basePath + * + * @param metadata + * @param pattern + * @return + */ + public FileStatus[] getDirectoriesMatchingPattern(HoodieDatasetReference metadata, String pattern) { + try { + Path path = new Path(metadata.getBaseDatasetPath() + pattern); + FileStatus[] status = fs.globStatus(path); + List returns = Lists.newArrayList(); + for(FileStatus st:status) { + if(!st.getPath().toString().contains(".distcp")) { + // Ignore temporary directories created by distcp + returns.add(st); + } + } + return returns.toArray(new FileStatus[returns.size()]); + } catch (IOException e) { + throw new HoodieHiveDatasetException( + "IOException when reading directories under dataset " + metadata + " with pattern " + + pattern, e); + } + } + + /** + * Get the list of storage partitions which does not have its equivalent hive partitions + * + * @param tablePartitions + * @param storagePartitions + * @return + */ + public List getUnregisteredStoragePartitions( + List tablePartitions, List storagePartitions) { + Set paths = Sets.newHashSet(); + for (TablePartition tablePartition : tablePartitions) { + paths.add(tablePartition.getLocation().toUri().getPath()); + } + List missing = Lists.newArrayList(); + for (StoragePartition storagePartition : storagePartitions) { + String hdfsPath = storagePartition.getPartitionPath().toUri().getPath(); + if (!paths.contains(hdfsPath)) { + missing.add(storagePartition); + } + } + return missing; + } + + /** + * Get the list of storage partitions which does not have its equivalent hive partitions + * + * @param tablePartitions + * @param storagePartitions + * @return + */ + public List getChangedStoragePartitions( + List tablePartitions, List storagePartitions) { + Map paths = Maps.newHashMap(); + for (TablePartition tablePartition : tablePartitions) { + String[] partitionKeyValueStr = tablePartition.getPartitionFieldValues(); + Arrays.sort(partitionKeyValueStr); + paths.put(Arrays.toString(partitionKeyValueStr), tablePartition.getLocation().toUri().getPath()); + } + + List changed = Lists.newArrayList(); + for (StoragePartition storagePartition : storagePartitions) { + String[] partitionKeyValues = storagePartition.getPartitionFieldValues(); + Arrays.sort(partitionKeyValues); + String partitionKeyValueStr = Arrays.toString(partitionKeyValues); + String hdfsPath = storagePartition.getPartitionPath().toUri().getPath(); + if (paths.containsKey(partitionKeyValueStr) && !paths.get(partitionKeyValueStr).equals(hdfsPath)) { + changed.add(storagePartition); + } + } + return changed; + } + + public int calculateStorageHash(FileStatus[] paths) { + return Objects.hashCode(paths); + } + +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java new file mode 100644 index 000000000..8fafb96f4 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java @@ -0,0 +1,365 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.client; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.hive.HoodieHiveConfiguration; +import com.uber.hoodie.hive.HoodieHiveDatasetException; +import com.uber.hoodie.hive.PartitionStrategy; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.SchemaDifference; +import com.uber.hoodie.hive.model.StoragePartition; +import com.uber.hoodie.hive.model.TablePartition; +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.schema.MessageType; + +import javax.sql.DataSource; +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Client to access Hive + */ +public class HoodieHiveClient implements Closeable { + private static Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class); + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + + static { + try { + Class.forName(driverName); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e); + } + } + + private final HoodieHiveConfiguration configuration; + private Connection connection; + private HiveConf hiveConf; + + public HoodieHiveClient(HoodieHiveConfiguration configuration) { + this.configuration = configuration; + this.hiveConf = new HiveConf(); + this.hiveConf.addResource(configuration.getConfiguration()); + try { + this.connection = getConnection(); + } catch (SQLException e) { + throw new HoodieHiveDatasetException("Failed to connect to hive metastore ", e); + } + } + + /** + * Scan all the partitions for the given {@link HoodieDatasetReference} with the given {@link PartitionStrategy} + * + * @param metadata + * @return + */ + public List scanPartitions(HoodieDatasetReference metadata) { + if (!checkTableExists(metadata)) { + throw new IllegalArgumentException( + "Failed to scan partitions as table " + metadata.getDatabaseTableName() + + " does not exist"); + } + List partitions = Lists.newArrayList(); + HiveMetaStoreClient client = null; + try { + client = new HiveMetaStoreClient(hiveConf); + List hivePartitions = client + .listPartitions(metadata.getDatabaseName(), metadata.getTableName(), (short) -1); + for (Partition partition : hivePartitions) { + partitions.add(new TablePartition(metadata, partition)); + } + return partitions; + } catch (Exception e) { + throw new HoodieHiveDatasetException("Failed to scan partitions for " + metadata, e); + } finally { + if (client != null) { + client.close(); + } + } + } + + /** + * Check if table exists + * + * @param metadata + * @return + */ + public boolean checkTableExists(HoodieDatasetReference metadata) { + ResultSet resultSet = null; + try { + Connection conn = getConnection(); + resultSet = conn.getMetaData() + .getTables(null, metadata.getDatabaseName(), metadata.getTableName(), null); + return resultSet.next(); + } catch (SQLException e) { + throw new HoodieHiveDatasetException("Failed to check if table exists " + metadata, e); + } finally { + closeQuietly(resultSet, null); + } + } + + /** + * Update the hive metastore pointed to by {@link HoodieDatasetReference} with the difference + * in schema {@link SchemaDifference} + * + * @param metadata + * @param hivePartitionFieldNames + * @param newSchema @return + */ + public boolean updateTableDefinition(HoodieDatasetReference metadata, + String[] hivePartitionFieldNames, MessageType newSchema) { + try { + String newSchemaStr = SchemaUtil.generateSchemaString(newSchema); + // Cascade clause should not be present for non-partitioned tables + String cascadeClause = hivePartitionFieldNames.length > 0 ? " cascade" : ""; + StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append("`") + .append(metadata.getDatabaseTableName()).append("`").append(" REPLACE COLUMNS(") + .append(newSchemaStr).append(" )").append(cascadeClause); + LOG.info("Creating table with " + sqlBuilder); + return updateHiveSQL(sqlBuilder.toString()); + } catch (IOException e) { + throw new HoodieHiveDatasetException("Failed to update table for " + metadata, e); + } + } + + /** + * Execute a update in hive metastore with this SQL + * + * @param s SQL to execute + * @return + */ + public boolean updateHiveSQL(String s) { + Statement stmt = null; + try { + Connection conn = getConnection(); + stmt = conn.createStatement(); + LOG.info("Executing SQL " + s); + return stmt.execute(s); + } catch (SQLException e) { + throw new HoodieHiveDatasetException("Failed in executing SQL " + s, e); + } finally { + closeQuietly(null, stmt); + } + } + + /** + * Get the table schema + * + * @param datasetReference + * @return + */ + public Map getTableSchema(HoodieDatasetReference datasetReference) { + if (!checkTableExists(datasetReference)) { + throw new IllegalArgumentException( + "Failed to get schema as table " + datasetReference.getDatabaseTableName() + + " does not exist"); + } + Map schema = Maps.newHashMap(); + ResultSet result = null; + try { + Connection connection = getConnection(); + DatabaseMetaData databaseMetaData = connection.getMetaData(); + result = databaseMetaData.getColumns(null, datasetReference.getDatabaseName(), + datasetReference.getTableName(), null); + while (result.next()) { + String columnName = result.getString(4); + String columnType = result.getString(6); + schema.put(columnName, columnType); + } + return schema; + } catch (SQLException e) { + throw new HoodieHiveDatasetException( + "Failed to get table schema for " + datasetReference, e); + } finally { + closeQuietly(result, null); + } + } + + public void addPartitionsToTable(HoodieDatasetReference datasetReference, + List partitionsToAdd, PartitionStrategy strategy) { + if (partitionsToAdd.isEmpty()) { + LOG.info("No partitions to add for " + datasetReference); + return; + } + LOG.info("Adding partitions " + partitionsToAdd.size() + " to dataset " + datasetReference); + String sql = constructAddPartitions(datasetReference, partitionsToAdd, strategy); + updateHiveSQL(sql); + } + + public void updatePartitionsToTable(HoodieDatasetReference datasetReference, + List changedPartitions, PartitionStrategy partitionStrategy) { + if (changedPartitions.isEmpty()) { + LOG.info("No partitions to change for " + datasetReference); + return; + } + LOG.info( + "Changing partitions " + changedPartitions.size() + " on dataset " + datasetReference); + List sqls = + constructChangePartitions(datasetReference, changedPartitions, partitionStrategy); + for (String sql : sqls) { + updateHiveSQL(sql); + } + } + + public void createTable(MessageType storageSchema, HoodieDatasetReference metadata, + String[] partitionKeys, String inputFormatClass, String outputFormatClass) { + try { + String createSQLQuery = SchemaUtil + .generateCreateDDL(storageSchema, metadata, partitionKeys, inputFormatClass, + outputFormatClass); + LOG.info("Creating table with " + createSQLQuery); + updateHiveSQL(createSQLQuery); + } catch (IOException e) { + throw new HoodieHiveDatasetException("Failed to create table for " + metadata, e); + } + } + + private static void closeQuietly(ResultSet resultSet, Statement stmt) { + try { + if (stmt != null) + stmt.close(); + if (resultSet != null) + resultSet.close(); + } catch (SQLException e) { + LOG.error("Could not close the resultset opened ", e); + } + } + + private Connection getConnection() throws SQLException { + int count = 0; + int maxTries = 3; + if (connection == null) { + Configuration conf = configuration.getConfiguration(); + DataSource ds = getDatasource(); + LOG.info("Getting Hive Connection from Datasource " + ds); + while (true) { + try { + this.connection = ds.getConnection(); + break; + } catch (SQLException e) { + if (++count == maxTries) + throw e; + } + } + } + return connection; + } + + private DataSource getDatasource() { + BasicDataSource ds = new BasicDataSource(); + ds.setDriverClassName(driverName); + ds.setUrl(getHiveJdbcUrlWithDefaultDBName()); + ds.setUsername(configuration.getHiveUsername()); + ds.setPassword(configuration.getHivePassword()); + return ds; + } + + public String getHiveJdbcUrlWithDefaultDBName() { + String hiveJdbcUrl = configuration.getHiveJdbcUrl(); + String urlAppend = null; + // If the hive url contains addition properties like ;transportMode=http;httpPath=hs2 + if (hiveJdbcUrl.contains(";")) { + urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";")); + hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";")); + } + if (!hiveJdbcUrl.endsWith("/")) { + hiveJdbcUrl = hiveJdbcUrl + "/"; + } + return hiveJdbcUrl + configuration.getDbName() + (urlAppend == null ? "" : urlAppend); + } + + private static List constructChangePartitions(HoodieDatasetReference metadata, + List partitions, PartitionStrategy partitionStrategy) { + String[] partitionFieldNames = partitionStrategy.getHivePartitionFieldNames(); + + List changePartitions = Lists.newArrayList(); + String alterTable = "ALTER TABLE " + metadata.getDatabaseTableName(); + for (StoragePartition partition : partitions) { + StringBuilder partBuilder = new StringBuilder(); + String[] partitionValues = partition.getPartitionFieldValues(); + Preconditions.checkArgument(partitionFieldNames.length == partitionValues.length, + "Partition key parts " + Arrays.toString(partitionFieldNames) + + " does not match with partition values " + Arrays.toString(partitionValues) + + ". Check partition strategy. "); + for (int i = 0; i < partitionFieldNames.length; i++) { + partBuilder.append(partitionFieldNames[i]).append("=").append("'") + .append(partitionValues[i]).append("'"); + } + String changePartition = + alterTable + " PARTITION (" + partBuilder.toString() + ") SET LOCATION '" + + "hdfs://nameservice1" + partition.getPartitionPath() + "'"; + changePartitions.add(changePartition); + } + return changePartitions; + } + + private static String constructAddPartitions(HoodieDatasetReference metadata, + List partitions, PartitionStrategy partitionStrategy) { + return constructAddPartitions(metadata.getDatabaseTableName(), partitions, + partitionStrategy); + } + + private static String constructAddPartitions(String newDbTableName, + List partitions, PartitionStrategy partitionStrategy) { + String[] partitionFieldNames = partitionStrategy.getHivePartitionFieldNames(); + StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); + alterSQL.append(newDbTableName).append(" ADD IF NOT EXISTS "); + for (StoragePartition partition : partitions) { + StringBuilder partBuilder = new StringBuilder(); + String[] partitionValues = partition.getPartitionFieldValues(); + Preconditions.checkArgument(partitionFieldNames.length == partitionValues.length, + "Partition key parts " + Arrays.toString(partitionFieldNames) + + " does not match with partition values " + Arrays.toString(partitionValues) + + ". Check partition strategy. "); + for (int i = 0; i < partitionFieldNames.length; i++) { + partBuilder.append(partitionFieldNames[i]).append("=").append("'") + .append(partitionValues[i]).append("'"); + } + alterSQL.append(" PARTITION (").append(partBuilder.toString()).append(") LOCATION '") + .append(partition.getPartitionPath()).append("' "); + } + + return alterSQL.toString(); + } + + @Override + public void close() throws IOException { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Could not close the connection opened ", e); + } + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java new file mode 100644 index 000000000..e11aeaf99 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java @@ -0,0 +1,436 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.client; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.uber.hoodie.hive.HoodieHiveDatasetException; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.SchemaDifference; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.schema.DecimalMetadata; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Schema Utilities + */ +public class SchemaUtil { + private static Logger LOG = LoggerFactory.getLogger(SchemaUtil.class); + + /** + * Get the schema difference between the storage schema and hive table schema + * + * @param storageSchema + * @param tableSchema + * @param partitionKeys + * @return + */ + public static SchemaDifference getSchemaDifference(MessageType storageSchema, + Map tableSchema, String[] partitionKeys) { + Map newTableSchema; + try { + newTableSchema = convertParquetSchemaToHiveSchema(storageSchema); + } catch (IOException e) { + throw new HoodieHiveDatasetException("Failed to convert parquet schema to hive schema", + e); + } + LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema); + SchemaDifference.Builder schemaDiffBuilder = + SchemaDifference.newBuilder(storageSchema, tableSchema); + Set tableColumns = Sets.newHashSet(); + + for (Map.Entry field : tableSchema.entrySet()) { + String fieldName = field.getKey().toLowerCase(); + String tickSurroundedFieldName = tickSurround(fieldName); + if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName) && !ArrayUtils + .contains(partitionKeys, fieldName)) { + schemaDiffBuilder.deleteTableColumn(fieldName); + } else { + // check type + String tableColumnType = field.getValue(); + if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName)) { + if (ArrayUtils.contains(partitionKeys, fieldName)) { + // Partition key does not have to be part of the storage schema + continue; + } + // We will log this and continue. Hive schema is a superset of all parquet schemas + LOG.warn("Ignoring table column " + fieldName + + " as its not present in the parquet schema"); + continue; + } + tableColumnType = tableColumnType.replaceAll("\\s+", ""); + + String expectedType = getExpectedType(newTableSchema, tickSurroundedFieldName); + expectedType = expectedType.replaceAll("\\s+", ""); + expectedType = expectedType.replaceAll("`", ""); + + if (!tableColumnType.equalsIgnoreCase(expectedType)) { + // check for incremental datasets, the schema type change is allowed as per evolution rules + if (!isSchemaTypeUpdateAllowed(tableColumnType, expectedType)) { + throw new HoodieHiveDatasetException( + "Could not convert field Type from " + tableColumnType + " to " + + expectedType + " for field " + fieldName); + } + schemaDiffBuilder.updateTableColumn(fieldName, + getExpectedType(newTableSchema, tickSurroundedFieldName)); + } + } + tableColumns.add(tickSurroundedFieldName); + } + + for (Map.Entry entry : newTableSchema.entrySet()) { + if (!tableColumns.contains(entry.getKey().toLowerCase())) { + schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue()); + } + } + LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString()); + + return schemaDiffBuilder.build(); + } + + private static String getExpectedType(Map newTableSchema, String fieldName) { + for (Map.Entry entry : newTableSchema.entrySet()) { + if (entry.getKey().toLowerCase().equals(fieldName)) { + return entry.getValue(); + } + } + return null; + } + + private static boolean isFieldExistsInSchema(Map newTableSchema, + String fieldName) { + for (String entry : newTableSchema.keySet()) { + if (entry.toLowerCase().equals(fieldName)) { + return true; + } + } + return false; + } + + + /** + * Returns equivalent Hive table schema read from a parquet file + * + * @param messageType : Parquet Schema + * @return : Hive Table schema read from parquet file MAP + * @throws IOException + */ + public static Map convertParquetSchemaToHiveSchema(MessageType messageType) + throws IOException { + Map schema = Maps.newLinkedHashMap(); + List parquetFields = messageType.getFields(); + for (Type parquetType : parquetFields) { + StringBuilder result = new StringBuilder(); + String key = parquetType.getName(); + if (parquetType.isRepetition(Type.Repetition.REPEATED)) { + result.append(createHiveArray(parquetType, "")); + } else { + result.append(convertField(parquetType)); + } + + schema.put(hiveCompatibleFieldName(key, false), result.toString()); + } + return schema; + } + + /** + * Convert one field data type of parquet schema into an equivalent Hive + * schema + * + * @param parquetType : Single paruet field + * @return : Equivalent sHive schema + */ + private static String convertField(final Type parquetType) { + StringBuilder field = new StringBuilder(); + if (parquetType.isPrimitive()) { + final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = + parquetType.asPrimitiveType().getPrimitiveTypeName(); + final OriginalType originalType = parquetType.getOriginalType(); + if (originalType == OriginalType.DECIMAL) { + final DecimalMetadata decimalMetadata = + parquetType.asPrimitiveType().getDecimalMetadata(); + return field.append("DECIMAL(").append(decimalMetadata.getPrecision()). + append(" , ").append(decimalMetadata.getScale()).append(")").toString(); + } + // TODO - fix the method naming here + return parquetPrimitiveTypeName + .convert(new PrimitiveType.PrimitiveTypeNameConverter() { + @Override + public String convertBOOLEAN( + PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "boolean"; + } + + @Override + public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "int"; + } + + @Override + public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "bigint"; + } + + @Override + public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "timestamp-millis"; + } + + @Override + public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "float"; + } + + @Override + public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "double"; + } + + @Override + public String convertFIXED_LEN_BYTE_ARRAY( + PrimitiveType.PrimitiveTypeName primitiveTypeName) { + return "binary"; + } + + @Override + public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { + if (originalType == OriginalType.UTF8 + || originalType == OriginalType.ENUM) { + return "string"; + } else { + return "binary"; + } + } + }); + } else { + GroupType parquetGroupType = parquetType.asGroupType(); + OriginalType originalType = parquetGroupType.getOriginalType(); + if (originalType != null) { + switch (originalType) { + case LIST: + if (parquetGroupType.getFieldCount() != 1) { + throw new UnsupportedOperationException( + "Invalid list type " + parquetGroupType); + } + Type elementType = parquetGroupType.getType(0); + if (!elementType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException( + "Invalid list type " + parquetGroupType); + } + return createHiveArray(elementType, parquetGroupType.getName()); + case MAP: + if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0) + .isPrimitive()) { + throw new UnsupportedOperationException( + "Invalid map type " + parquetGroupType); + } + GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); + if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) || + !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) || + mapKeyValType.getFieldCount() != 2) { + throw new UnsupportedOperationException( + "Invalid map type " + parquetGroupType); + } + Type keyType = mapKeyValType.getType(0); + if (!keyType.isPrimitive() || + !keyType.asPrimitiveType().getPrimitiveTypeName() + .equals(PrimitiveType.PrimitiveTypeName.BINARY) || + !keyType.getOriginalType().equals(OriginalType.UTF8)) { + throw new UnsupportedOperationException( + "Map key type must be binary (UTF8): " + keyType); + } + Type valueType = mapKeyValType.getType(1); + return createHiveMap(convertField(keyType), convertField(valueType)); + case ENUM: + case UTF8: + return "string"; + case MAP_KEY_VALUE: + // MAP_KEY_VALUE was supposed to be used to annotate key and + // value group levels in a + // MAP. However, that is always implied by the structure of + // MAP. Hence, PARQUET-113 + // dropped the requirement for having MAP_KEY_VALUE. + default: + throw new UnsupportedOperationException( + "Cannot convert Parquet type " + parquetType); + } + } else { + // if no original type then it's a record + return createHiveStruct(parquetGroupType.getFields()); + } + } + } + + /** + * Return a 'struct' Hive schema from a list of Parquet fields + * + * @param parquetFields : list of parquet fields + * @return : Equivalent 'struct' Hive schema + */ + private static String createHiveStruct(List parquetFields) { + StringBuilder struct = new StringBuilder(); + struct.append("STRUCT< "); + for (Type field : parquetFields) { + //TODO: struct field name is only translated to support special char($) + //We will need to extend it to other collection type + struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : "); + struct.append(convertField(field)).append(", "); + } + struct.delete(struct.length() - 2, struct.length()); // Remove the last + // ", " + struct.append(">"); + String finalStr = struct.toString(); + // Struct cannot have - in them. userstore_udr_entities has uuid in struct. This breaks the schema. + // HDrone sync should not fail because of this. + finalStr = finalStr.replaceAll("-", "_"); + return finalStr; + } + + + private static String hiveCompatibleFieldName(String fieldName, boolean isNested) { + String result = fieldName; + if (isNested) { + result = ColumnNameXLator.translateNestedColumn(fieldName); + } + return tickSurround(result); + } + + private static String tickSurround(String result) { + if (!result.startsWith("`")) { + result = "`" + result; + } + if (!result.endsWith("`")) { + result = result + "`"; + } + return result; + } + + /** + * Create a 'Map' schema from Parquet map field + * + * @param keyType + * @param valueType + * @return + */ + private static String createHiveMap(String keyType, String valueType) { + return "MAP< " + keyType + ", " + valueType + ">"; + } + + /** + * Create an Array Hive schema from equivalent parquet list type + * + * @param elementType + * @param elementName + * @return + */ + private static String createHiveArray(Type elementType, String elementName) { + StringBuilder array = new StringBuilder(); + array.append("ARRAY< "); + if (elementType.isPrimitive()) { + array.append(convertField(elementType)); + } else { + final GroupType groupType = elementType.asGroupType(); + final List groupFields = groupType.getFields(); + if (groupFields.size() > 1 || (groupFields.size() == 1 && ( + elementType.getName().equals("array") || elementType.getName() + .equals(elementName + "_tuple")))) { + array.append(convertField(elementType)); + } else { + array.append(convertField(groupType.getFields().get(0))); + } + } + array.append(">"); + return array.toString(); + } + + public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) { + if (prevType == null || prevType.trim().isEmpty() || + newType == null || newType.trim().isEmpty()) { + return false; + } + prevType = prevType.toLowerCase(); + newType = newType.toLowerCase(); + if (prevType.equals(newType)) { + return true; + } else if (prevType.equalsIgnoreCase("int") && newType.equalsIgnoreCase("bigint")) { + return true; + } else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) { + return true; + } else if (prevType.contains("struct") && newType.toLowerCase().contains("struct")) { + return true; + } + return false; + } + + public static String generateSchemaString(MessageType storageSchema) throws IOException { + Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); + StringBuilder columns = new StringBuilder(); + for (Map.Entry hiveSchemaEntry : hiveSchema.entrySet()) { + columns.append(hiveSchemaEntry.getKey()).append(" "); + columns.append(hiveSchemaEntry.getValue()).append(", "); + } + // Remove the last ", " + columns.delete(columns.length() - 2, columns.length()); + return columns.toString(); + } + + public static String generateCreateDDL(MessageType storageSchema, + HoodieDatasetReference metadata, String[] partitionKeys, String inputFormatClass, + String outputFormatClass) throws IOException { + Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema); + String columns = generateSchemaString(storageSchema); + + StringBuilder partitionFields = new StringBuilder(); + for (String partitionKey : partitionKeys) { + partitionFields.append(partitionKey).append(" ") + .append(getPartitionKeyType(hiveSchema, partitionKey)); + } + + StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); + sb = sb.append(metadata.getDatabaseTableName()); + sb = sb.append("( ").append(columns).append(")"); + if (partitionKeys.length > 0) { + sb = sb.append(" PARTITIONED BY (").append(partitionFields).append(")"); + } + sb = sb.append(" ROW FORMAT SERDE '").append(ParquetHiveSerDe.class.getName()).append("'"); + sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); + sb = sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '") + .append(metadata.getBaseDatasetPath()).append("'"); + return sb.toString(); + } + + private static String getPartitionKeyType(Map hiveSchema, String partitionKey) { + if (hiveSchema.containsKey(partitionKey)) { + return hiveSchema.get(partitionKey); + } + // Default the unknown partition fields to be String + // TODO - all partition fields should be part of the schema. datestr is treated as special. Dont do that + return "String"; + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java new file mode 100644 index 000000000..1b23e870c --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.example; + +import com.uber.hoodie.hive.HoodieHiveConfiguration; +import com.uber.hoodie.hive.HoodieHiveDatasetSyncTask; +import com.uber.hoodie.hive.PartitionStrategy; +import com.uber.hoodie.hive.SchemaStrategy; +import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy; +import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import org.apache.hadoop.conf.Configuration; + +/** + * Example showing basic usage of Hoodie Hive API + */ +public class HoodieDatasetExample { + public static void main(String[] args) { + // Configure to point to which metastore and database to connect to + HoodieHiveConfiguration apiConfig = + HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration()) + .hivedb("tmp").hiveJdbcUrl("jdbc:hive2://localhost:10010/").jdbcUsername("hive") + .jdbcPassword("hive").build(); + + HoodieDatasetReference datasetReference = + new HoodieDatasetReference("clickstream", "hdfs:///data/tables/user.clickstream", + "raw"); + + // initialize the strategies + PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy(); + SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy(); + + // Creates a new dataset which reflects the state at the time of creation + HoodieHiveDatasetSyncTask datasetSyncTask = + HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference) + .withConfiguration(apiConfig).partitionStrategy(partitionStrategy) + .schemaStrategy(schemaStrategy).build(); + + // Sync dataset + datasetSyncTask.sync(); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java new file mode 100644 index 000000000..5300cb045 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.impl; + +import com.uber.hoodie.hive.PartitionStrategy; +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple day based partitions. + * Storage is of this format yyyy/mm/dd + * Table is partitioned by dateStringFieldName=MM/dd/yyyy + */ +public class DayBasedPartitionStrategy implements PartitionStrategy { + private Logger LOG = LoggerFactory.getLogger(DayBasedPartitionStrategy.class); + private final String dateStringFieldName; + private final DateTimeFormatter dtfOut; + + public DayBasedPartitionStrategy() { + this.dateStringFieldName = "datestr"; + this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd"); + } + + @Override public FileStatus[] scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) { + return fsClient.getDirectoriesMatchingPattern(ref, "/*/*/*"); + } + + @Override public String[] getHivePartitionFieldNames() { + return new String[] {dateStringFieldName}; + } + + @Override + public String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition) { + //yyyy/mm/dd + String basePath = metadata.getBaseDatasetPath(); + String partitionPath = partition.toUri().getPath(); + if (!partitionPath.contains(basePath)) { + throw new IllegalArgumentException( + "Partition path " + partitionPath + " is not part of the dataset " + metadata); + } + // Get the partition part and remove the / as well at the end + String partitionPart = partitionPath.substring(basePath.length() + 1); + LOG.info("Extracting parts from " + partitionPart); + int year = extractPart(partitionPart, 0); + int mm = extractPart(partitionPart, 1); + int dd = extractPart(partitionPart, 2); + DateTime dateTime = new DateTime(year, mm, dd, 0, 0); + return new String[] {dtfOut.print(dateTime)}; + } + + private int extractPart(String pathString, int index) { + String[] parts = pathString.split("/"); + String part = parts[index]; + return Integer.parseInt(part); + } + +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java new file mode 100644 index 000000000..424b64d52 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.impl; + +import com.uber.hoodie.hive.HoodieHiveDatasetException; +import com.uber.hoodie.hive.SchemaStrategy; +import com.uber.hoodie.hive.client.HoodieFSClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import org.apache.hadoop.fs.Path; +import parquet.schema.MessageType; + +import java.io.IOException; + +/** + * Schema strategy to read the parquet schema from any of the data file + */ +public class ParseSchemaFromDataStrategy implements SchemaStrategy { + @Override + public MessageType getDatasetSchema(HoodieDatasetReference metadata, HoodieFSClient fsClient) { + Path anyDataFile = fsClient.lastDataFileForDataset(metadata, metadata.getBaseDatasetPath()); + try { + return fsClient.readSchemaFromDataFile(anyDataFile); + } catch (IOException e) { + throw new HoodieHiveDatasetException( + "Could not read schema for " + metadata + ", tried to read schema from " + + anyDataFile, e); + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java new file mode 100644 index 000000000..41598bc7d --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.model; + + +import java.util.Objects; + +/** + * A reference to a Dataset. Each dataset will have a hadoop configuration, table name, + * base path in HDFS. {@link HoodieDatasetReference} is immutable. + */ +public class HoodieDatasetReference { + private String tableName; + private String baseDatasetPath; + private String databaseName; + + public HoodieDatasetReference(String tableName, String baseDatasetPath, String databaseName) { + this.tableName = tableName; + this.baseDatasetPath = baseDatasetPath; + this.databaseName = databaseName; + } + + public String getDatabaseTableName() { + return databaseName + "." + tableName; + } + + public String getTableName() { + return tableName; + } + + public String getBaseDatasetPath() { + return baseDatasetPath; + } + + public String getDatabaseName() { + return databaseName; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + HoodieDatasetReference that = (HoodieDatasetReference) o; + return Objects.equals(tableName, that.tableName) && + Objects.equals(baseDatasetPath, that.baseDatasetPath) && + Objects.equals(databaseName, that.databaseName); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, baseDatasetPath, databaseName); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("HoodieDatasetReference{"); + sb.append("tableName='").append(tableName).append('\''); + sb.append(", baseDatasetPath='").append(baseDatasetPath).append('\''); + sb.append(", databaseName='").append(databaseName).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java new file mode 100644 index 000000000..a6ad1a5c0 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.model; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import parquet.schema.MessageType; + +import java.util.List; +import java.util.Map; + +/** + * Represents the schema difference between the storage schema and hive table schema + */ +public class SchemaDifference { + private final MessageType storageSchema; + private final Map tableSchema; + private final List deleteColumns; + private final Map updateColumnTypes; + private final Map addColumnTypes; + + private SchemaDifference(MessageType storageSchema, Map tableSchema, + List deleteColumns, Map updateColumnTypes, Map addColumnTypes) { + this.storageSchema = storageSchema; + this.tableSchema = tableSchema; + this.deleteColumns = ImmutableList.copyOf(deleteColumns); + this.updateColumnTypes = ImmutableMap.copyOf(updateColumnTypes); + this.addColumnTypes = ImmutableMap.copyOf(addColumnTypes); + } + + public List getDeleteColumns() { + return deleteColumns; + } + + public Map getUpdateColumnTypes() { + return updateColumnTypes; + } + + public Map getAddColumnTypes() { + return addColumnTypes; + } + + @Override public String toString() { + return Objects.toStringHelper(this).add("deleteColumns", deleteColumns) + .add("updateColumnTypes", updateColumnTypes).add("addColumnTypes", addColumnTypes) + .toString(); + } + + public static Builder newBuilder(MessageType storageSchema, Map tableSchema) { + return new Builder(storageSchema, tableSchema); + } + + public boolean isEmpty() { + return deleteColumns.isEmpty() && updateColumnTypes.isEmpty() && addColumnTypes.isEmpty(); + } + + public static class Builder { + private final MessageType storageSchema; + private final Map tableSchema; + private List deleteColumns; + private Map updateColumnTypes; + private Map addColumnTypes; + + public Builder(MessageType storageSchema, Map tableSchema) { + this.storageSchema = storageSchema; + this.tableSchema = tableSchema; + deleteColumns = Lists.newArrayList(); + updateColumnTypes = Maps.newHashMap(); + addColumnTypes = Maps.newHashMap(); + } + + public Builder deleteTableColumn(String column) { + deleteColumns.add(column); + return this; + } + + public Builder updateTableColumn(String column, String storageColumnType) { + updateColumnTypes.put(column, storageColumnType); + return this; + } + + public Builder addTableColumn(String name, String type) { + addColumnTypes.put(name, type); + return this; + } + + public SchemaDifference build() { + return new SchemaDifference(storageSchema, tableSchema, deleteColumns, updateColumnTypes, addColumnTypes); + } + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java new file mode 100644 index 000000000..4558b78c4 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.model; + +import com.google.common.base.Objects; +import com.uber.hoodie.hive.PartitionStrategy; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StoragePartition { + private static Logger LOG = LoggerFactory.getLogger(StoragePartition.class); + private final PartitionStrategy partitionStrategy; + private final Path partitionPath; + private final HoodieDatasetReference metadata; + + public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy, + FileStatus input) { + this.metadata = metadata; + this.partitionPath = Path.getPathWithoutSchemeAndAuthority(input.getPath()); + this.partitionStrategy = partitionStrategy; + } + + public String[] getPartitionFieldValues() { + return partitionStrategy.convertPartitionToValues(metadata, partitionPath); + } + + public Path getPartitionPath() { + return partitionPath; + } + + @Override public String toString() { + return Objects.toStringHelper(this).add("partitionPath", partitionPath) + .add("metadata", metadata).toString(); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java new file mode 100644 index 000000000..480463d7f --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.model; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; + +public class TablePartition { + private final HoodieDatasetReference metadata; + private final Partition partition; + + public TablePartition(HoodieDatasetReference metadata, Partition partition) { + this.metadata = metadata; + this.partition = partition; + } + + public Path getLocation() { + return Path.getPathWithoutSchemeAndAuthority(new Path(partition.getSd().getLocation())); + } + + public String[] getPartitionFieldValues() { + return partition.getValues().toArray(new String[partition.getValuesSize()]); + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java new file mode 100644 index 000000000..e00e5f6b4 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import com.uber.hoodie.hive.client.SchemaUtil; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.model.SchemaDifference; +import com.uber.hoodie.hive.util.TestUtil; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.model.InitializationError; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class DatasetSchemaTest { + @Before + public void setUp() throws IOException, InterruptedException { + TestUtil.setUp(); + } + + @Test + public void testSchemaDiff() throws IOException, InitializationError { + HoodieDatasetReference metadata = TestUtil + .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema"); + HoodieHiveSchemaSyncTask schema = + HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + SchemaDifference diff = schema.getSchemaDifference(); + assertEquals("There should be 4 columns to be added", 4, diff.getAddColumnTypes().size()); + assertEquals("No update columns expected", 0, diff.getUpdateColumnTypes().size()); + assertEquals("No delete columns expected", 0, diff.getDeleteColumns().size()); + schema.sync(); + + schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + diff = schema.getSchemaDifference(); + assertEquals("After sync, there should not be any new columns to add", 0, + diff.getAddColumnTypes().size()); + assertEquals("After sync, there should not be any new columns to update", 0, + diff.getUpdateColumnTypes().size()); + assertEquals("After sync, there should not be any new columns to delete", 0, + diff.getDeleteColumns().size()); + } + + @Test + public void testSchemaEvolution() throws IOException, InitializationError { + int initialPartitionsCount = 5; + HoodieDatasetReference metadata = TestUtil + .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", + initialPartitionsCount, "/nation.schema"); + HoodieHiveSchemaSyncTask schema = + HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + schema.sync(); + + schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + SchemaDifference diff = schema.getSchemaDifference(); + assertEquals("After sync, diff should be empty", true, diff.isEmpty()); + int newSchemaversion = 2; + int newPartitionsCount = 2; + TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema", + DateTime.now().getMillis(), newSchemaversion); + schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + diff = schema.getSchemaDifference(); + assertEquals("Schema has evolved, there should be a diff", false, diff.isEmpty()); + assertEquals("Schema has evolved, there should be 1 column to add", 1, + diff.getAddColumnTypes().size()); + assertEquals("Schema has evolved, there should be 1 column to update", 1, + diff.getUpdateColumnTypes().size()); + assertEquals(0, diff.getDeleteColumns().size()); + } + + /** + * Testing converting array types to Hive field declaration strings, + * according to the Parquet-113 spec: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + */ + @Test + public void testSchemaConvertArray() throws IOException { + // Testing the 3-level annotation structure + MessageType schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().optional(PrimitiveType.PrimitiveTypeName.INT32).named("element") + .named("list").named("int_list").named("ArrayOfInts"); + + String schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`int_list` ARRAY< int>", schemaString); + + // A array of arrays + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().requiredGroup().as(OriginalType.LIST).repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list") + .named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString); + + // A list of integers + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeated(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("int_list") + .named("ArrayOfInts"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`int_list` ARRAY< int>", schemaString); + + // A list of structs with two fields + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str") + .required(PrimitiveType.PrimitiveTypeName.INT32).named("num").named("element") + .named("tuple_list").named("ArrayOfTuples"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString); + + // A list of structs with a single field + // For this case, since the inner group name is "array", we treat the + // element type as a one-element struct. + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str") + .named("array").named("one_tuple_list").named("ArrayOfOneTuples"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); + + // A list of structs with a single field + // For this case, since the inner group name ends with "_tuple", we also treat the + // element type as a one-element struct. + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str") + .named("one_tuple_list_tuple").named("one_tuple_list").named("ArrayOfOneTuples2"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); + + // A list of structs with a single field + // Unlike the above two cases, for this the element type is the type of the + // only field in the struct. + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str") + .named("one_tuple_list").named("one_tuple_list").named("ArrayOfOneTuples3"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`one_tuple_list` ARRAY< binary>", schemaString); + + // A list of maps + schema = + parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST) + .repeatedGroup().as(OriginalType.MAP).repeatedGroup().as(OriginalType.MAP_KEY_VALUE) + .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8) + .named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32) + .named("int_value").named("key_value").named("array").named("map_list") + .named("ArrayOfMaps"); + + schemaString = SchemaUtil.generateSchemaString(schema); + assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString); + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java new file mode 100644 index 000000000..cdcb528d1 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import com.uber.hoodie.hive.client.HoodieHiveClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import com.uber.hoodie.hive.util.TestUtil; +import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.model.InitializationError; +import parquet.schema.MessageType; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HDroneDatasetTest { + private HoodieHiveClient hiveClient; + + @Before + public void setUp() throws IOException, InterruptedException { + TestUtil.setUp(); + hiveClient = new HoodieHiveClient(TestUtil.hDroneConfiguration); + } + + @Test + public void testDatasetCreation() throws IOException, InitializationError { + HoodieDatasetReference metadata = TestUtil + .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema"); + HoodieHiveDatasetSyncTask dataset = + HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + assertEquals("There should be 5 new partitions", 5, dataset.getNewPartitions().size()); + assertEquals("There should not be any changed partitions", 0, + dataset.getChangedPartitions().size()); + assertFalse("Table should not exist", hiveClient.checkTableExists(metadata)); + dataset.sync(); + + dataset = HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + assertTrue("Table should exist after sync", hiveClient.checkTableExists(metadata)); + assertEquals("After sync, There should not be any new partitions to sync", 0, + dataset.getNewPartitions().size()); + assertEquals("After sync, There should not be any modified partitions to sync", 0, + dataset.getChangedPartitions().size()); + + assertEquals("Table Schema should have 5 fields", 5, + hiveClient.getTableSchema(metadata).size()); + } + + @Test + public void testDatasetEvolution() throws IOException, InitializationError { + int initialPartitionsCount = 5; + HoodieDatasetReference metadata = TestUtil + .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", + initialPartitionsCount, "/nation.schema"); + HoodieHiveDatasetSyncTask dataset = + HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata) + .withConfiguration(TestUtil.hDroneConfiguration).build(); + dataset.sync(); + + dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build(); + int newSchemaversion = 2; + int newPartitionsCount = 2; + TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema", + DateTime.now().getMillis(), newSchemaversion); + dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build(); + assertEquals("There should be " + newPartitionsCount + " partitions to be added", + newPartitionsCount, dataset.getNewPartitions().size()); + dataset.sync(); + + dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build(); + MessageType newDatasetSchema = dataset.getSchemaSyncTask().getStorageSchema(); + MessageType expectedSchema = TestUtil.readSchema("/nation_evolved.schema"); + assertEquals("Table schema should be evolved schema", expectedSchema, newDatasetSchema); + assertEquals("Table schema should have 6 fields", 6, + hiveClient.getTableSchema(metadata).size()); + assertEquals("", "BIGINT", hiveClient.getTableSchema(metadata).get("region_key")); + } + +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java new file mode 100644 index 000000000..321d836c0 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + + +import org.apache.hadoop.fs.Path; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.List; + +public class CsvParquetWriter extends ParquetWriter> { + + public CsvParquetWriter(Path file, MessageType schema) throws IOException { + this(file, schema, false); + } + + public CsvParquetWriter(Path file, MessageType schema, boolean enableDictionary) + throws IOException { + this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary); + } + + public CsvParquetWriter(Path file, MessageType schema, CompressionCodecName codecName, + boolean enableDictionary) throws IOException { + super(file, new CsvWriteSupport(schema), codecName, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false); + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java new file mode 100644 index 000000000..49982f0bb --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + +import org.apache.hadoop.conf.Configuration; +import parquet.column.ColumnDescriptor; +import parquet.hadoop.api.WriteSupport; +import parquet.io.ParquetEncodingException; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; + +import java.util.HashMap; +import java.util.List; + +public class CsvWriteSupport extends WriteSupport> { + MessageType schema; + RecordConsumer recordConsumer; + List cols; + + // TODO: support specifying encodings and compression + public CsvWriteSupport(MessageType schema) { + this.schema = schema; + this.cols = schema.getColumns(); + } + + @Override public WriteContext init(Configuration config) { + return new WriteContext(schema, new HashMap()); + } + + @Override public void prepareForWrite(RecordConsumer r) { + recordConsumer = r; + } + + @Override public void write(List values) { + if (values.size() != cols.size()) { + throw new ParquetEncodingException("Invalid input data. Expecting " + + cols.size() + " columns. Input had " + values.size() + " columns (" + cols + ") : " + + values); + } + + recordConsumer.startMessage(); + for (int i = 0; i < cols.size(); ++i) { + String val = values.get(i); + // val.length() == 0 indicates a NULL value. + if (val.length() > 0) { + recordConsumer.startField(cols.get(i).getPath()[0], i); + switch (cols.get(i).getType()) { + case BOOLEAN: + recordConsumer.addBoolean(Boolean.parseBoolean(val)); + break; + case FLOAT: + recordConsumer.addFloat(Float.parseFloat(val)); + break; + case DOUBLE: + recordConsumer.addDouble(Double.parseDouble(val)); + break; + case INT32: + recordConsumer.addInteger(Integer.parseInt(val)); + break; + case INT64: + recordConsumer.addLong(Long.parseLong(val)); + break; + case BINARY: + recordConsumer.addBinary(stringToBinary(val)); + break; + default: + throw new ParquetEncodingException( + "Unsupported column type: " + cols.get(i).getType()); + } + recordConsumer.endField(cols.get(i).getPath()[0], i); + } + } + recordConsumer.endMessage(); + } + + private Binary stringToBinary(Object value) { + return Binary.fromString(value.toString()); + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java new file mode 100644 index 000000000..a96d682ae --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + + +import com.google.common.base.Preconditions; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * An HDFS minicluster service implementation. + */ +public class HdfsTestService { + + private static final Logger logger = LoggerFactory.getLogger(HdfsTestService.class); + + /** + * Configuration settings + */ + private Configuration hadoopConf; + private String workDir; + private String bindIP = "127.0.0.1"; + private int namenodeRpcPort = 8020; + private int namenodeHttpPort = 50070; + private int datanodePort = 50010; + private int datanodeIpcPort = 50020; + private int datanodeHttpPort = 50075; + + /** + * Embedded HDFS cluster + */ + private MiniDFSCluster miniDfsCluster; + + public HdfsTestService() { + hadoopConf = new Configuration(); + workDir = Files.createTempDir().getAbsolutePath(); + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public MiniDFSCluster start(boolean format) throws IOException { + Preconditions + .checkState(workDir != null, "The work dir must be set before starting cluster."); + + if (hadoopConf == null) { + hadoopConf = new Configuration(); + } + + // If clean, then remove the work dir so we can start fresh. + String localDFSLocation = getDFSLocation(workDir); + if (format) { + logger.info( + "Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh."); + File file = new File(localDFSLocation); + FileUtils.deleteDirectory(file); + } + + // Configure and start the HDFS cluster + // boolean format = shouldFormatDFSCluster(localDFSLocation, clean); + hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort, + namenodeHttpPort, datanodePort, datanodeIpcPort, datanodeHttpPort); + miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format) + .checkDataNodeAddrConfig(true).checkDataNodeHostConfig(true).build(); + logger.info("HDFS Minicluster service started."); + return miniDfsCluster; + } + + public void stop() throws IOException { + miniDfsCluster.shutdown(); + logger.info("HDFS Minicluster service shut down."); + miniDfsCluster = null; + hadoopConf = null; + } + + /** + * Get the location on the local FS where we store the HDFS data. + * + * @param baseFsLocation The base location on the local filesystem we have write access to + * create dirs. + * @return The location for HDFS data. + */ + private static String getDFSLocation(String baseFsLocation) { + return baseFsLocation + Path.SEPARATOR + "dfs"; + } + + /** + * Returns true if we should format the DFS Cluster. We'll format if clean is + * true, or if the dfsFsLocation does not exist. + * + * @param localDFSLocation The location on the local FS to hold the HDFS metadata and block + * data + * @param clean Specifies if we want to start a clean cluster + * @return Returns true if we should format a DFSCluster, otherwise false + */ + private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean clean) { + boolean format = true; + File f = new File(localDFSLocation); + if (f.exists() && f.isDirectory() && !clean) { + format = false; + } + return format; + } + + /** + * Configure the DFS Cluster before launching it. + * + * @param config The already created Hadoop configuration we'll further configure + * for HDFS + * @param localDFSLocation The location on the local filesystem where cluster data is stored + * @param bindIP An IP address we want to force the datanode and namenode to bind + * to. + * @param namenodeRpcPort + * @param namenodeHttpPort + * @param datanodePort + * @param datanodeIpcPort + * @param datanodeHttpPort + * @return The updated Configuration object. + */ + private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, + String bindIP, int namenodeRpcPort, int namenodeHttpPort, int datanodePort, + int datanodeIpcPort, int datanodeHttpPort) { + + logger.info("HDFS force binding to ip: " + bindIP); + config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort); + config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort); + config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort); + config.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, bindIP + ":" + datanodeHttpPort); + // When a datanode registers with the namenode, the Namenode do a hostname + // check of the datanode which will fail on OpenShift due to reverse DNS + // issues with the internal IP addresses. This config disables that check, + // and will allow a datanode to connect regardless. + config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", false); + config.set("hdfs.minidfs.basedir", localDFSLocation); + // allow current user to impersonate others + String user = System.getProperty("user.name"); + config.set("hadoop.proxyuser." + user + ".groups", "*"); + config.set("hadoop.proxyuser." + user + ".hosts", "*"); + return config; + } + +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java new file mode 100644 index 000000000..e9faa4536 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.hadoop.hive.metastore.TUGIBasedProcessor; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.thrift.TUGIContainingTransport; +import org.apache.hive.service.server.HiveServer2; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class HiveTestService { + + private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class); + + private static final int CONNECTION_TIMEOUT = 30000; + + /** + * Configuration settings + */ + private Configuration hadoopConf; + private String workDir; + private String bindIP = "127.0.0.1"; + private int metastorePort = 9083; + private int serverPort = 9999; + private boolean clean = true; + + private Map sysProps = Maps.newHashMap(); + private ExecutorService executorService; + private TServer tServer; + private HiveServer2 hiveServer; + + public HiveTestService(Configuration configuration) { + this.workDir = Files.createTempDir().getAbsolutePath(); + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public HiveServer2 start() throws IOException { + Preconditions + .checkState(workDir != null, "The work dir must be set before starting cluster."); + + if (hadoopConf == null) { + hadoopConf = new Configuration(); + } + + String localHiveLocation = getHiveLocation(workDir); + if (clean) { + LOG.info( + "Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh."); + File file = new File(localHiveLocation); + FileUtils.deleteDirectory(file); + } + + HiveConf serverConf = configureHive(hadoopConf, localHiveLocation); + + executorService = Executors.newSingleThreadExecutor(); + tServer = startMetaStore(bindIP, metastorePort, serverConf); + + hiveServer = startHiveServer(serverConf); + + String serverHostname; + if (bindIP.equals("0.0.0.0")) { + serverHostname = "localhost"; + } else { + serverHostname = bindIP; + } + if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + LOG.info("Hive Minicluster service started."); + return hiveServer; + } + + public void stop() throws IOException { + resetSystemProperties(); + if (tServer != null) { + tServer.stop(); + } + if (hiveServer != null) { + hiveServer.stop(); + } + LOG.info("Hive Minicluster service shut down."); + tServer = null; + hiveServer = null; + hadoopConf = null; + } + + private HiveConf configureHive(Configuration conf, String localHiveLocation) + throws IOException { + conf.set("hive.metastore.local", "false"); + conf.set(HiveConf.ConfVars.METASTOREURIS.varname, + "thrift://" + bindIP + ":" + metastorePort); + conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP); + conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort); + // The following line to turn of SASL has no effect since HiveAuthFactory calls + // 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657, + // in Hive 0.14. + // As a workaround, the property is set in hive-site.xml in this module. + //conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL"); + File localHiveDir = new File(localHiveLocation); + localHiveDir.mkdirs(); + File metastoreDbDir = new File(localHiveDir, "metastore_db"); + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, + "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true"); + File derbyLogFile = new File(localHiveDir, "derby.log"); + derbyLogFile.createNewFile(); + setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); + conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + Files.createTempDir().getAbsolutePath()); + + return new HiveConf(conf, this.getClass()); + } + + private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + new HiveMetaStoreClient(serverConf); + return true; + } catch (MetaException e) { + // ignore as this is expected + LOG.info("server " + hostname + ":" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } + + private void setSystemProperty(String name, String value) { + if (!sysProps.containsKey(name)) { + String currentValue = System.getProperty(name); + sysProps.put(name, currentValue); + } + if (value != null) { + System.setProperty(name, value); + } else { + System.getProperties().remove(name); + } + } + + private void resetSystemProperties() { + for (Map.Entry entry : sysProps.entrySet()) { + if (entry.getValue() != null) { + System.setProperty(entry.getKey(), entry.getValue()); + } else { + System.getProperties().remove(entry.getKey()); + } + } + sysProps.clear(); + } + + private static String getHiveLocation(String baseLocation) { + return baseLocation + Path.SEPARATOR + "hive"; + } + + private HiveServer2 startHiveServer(HiveConf serverConf) { + HiveServer2 hiveServer = new HiveServer2(); + hiveServer.init(serverConf); + hiveServer.start(); + return hiveServer; + } + + // XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore, + // with changes to support binding to a specified IP address (not only 0.0.0.0) + + + private static final class ChainedTTransportFactory extends TTransportFactory { + private final TTransportFactory parentTransFactory; + private final TTransportFactory childTransFactory; + + private ChainedTTransportFactory(TTransportFactory parentTransFactory, + TTransportFactory childTransFactory) { + this.parentTransFactory = parentTransFactory; + this.childTransFactory = childTransFactory; + } + + @Override public TTransport getTransport(TTransport trans) { + return childTransFactory.getTransport(parentTransFactory.getTransport(trans)); + } + } + + + private static final class TServerSocketKeepAlive extends TServerSocket { + public TServerSocketKeepAlive(int port) throws TTransportException { + super(port, 0); + } + + public TServerSocketKeepAlive(InetSocketAddress address) throws TTransportException { + super(address, 0); + } + + @Override protected TSocket acceptImpl() throws TTransportException { + TSocket ts = super.acceptImpl(); + try { + ts.getSocket().setKeepAlive(true); + } catch (SocketException e) { + throw new TTransportException(e); + } + return ts; + } + } + + public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException { + try { + // Server will create new threads up to max as necessary. After an idle + // period, it will destory threads to keep the number of threads in the + // pool to min. + int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); + int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); + boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE); + boolean useFramedTransport = + conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); + + // don't support SASL yet + //boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); + + TServerTransport serverTransport; + if (forceBindIP != null) { + InetSocketAddress address = new InetSocketAddress(forceBindIP, port); + serverTransport = + tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address); + + } else { + serverTransport = + tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); + } + + TProcessor processor; + TTransportFactory transFactory; + + IHMSHandler handler = (IHMSHandler) HiveMetaStore + .newRetryingHMSHandler("new db based metaserver", conf, true); + + if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) { + transFactory = useFramedTransport ? + new ChainedTTransportFactory(new TFramedTransport.Factory(), + new TUGIContainingTransport.Factory()) : + new TUGIContainingTransport.Factory(); + + processor = new TUGIBasedProcessor(handler); + LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); + } else { + transFactory = + useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory(); + processor = new TSetIpAddressProcessor(handler); + LOG.info("Starting DB backed MetaStore Server"); + } + + TThreadPoolServer.Args args = + new TThreadPoolServer.Args(serverTransport).processor(processor) + .transportFactory(transFactory).protocolFactory(new TBinaryProtocol.Factory()) + .minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads); + + final TServer tServer = new TThreadPoolServer(args); + executorService.submit(new Runnable() { + @Override public void run() { + tServer.serve(); + } + }); + return tServer; + } catch (Throwable x) { + throw new IOException(x); + } + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java new file mode 100644 index 000000000..f07bc0c3f --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + +import com.google.common.collect.Sets; +import com.uber.hoodie.hive.HoodieHiveConfiguration; +import com.uber.hoodie.hive.client.HoodieHiveClient; +import com.uber.hoodie.hive.model.HoodieDatasetReference; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hive.service.server.HiveServer2; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.runners.model.InitializationError; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Set; +import java.util.regex.Pattern; + +public class TestUtil { + private static MiniDFSCluster dfsCluster; + private static ZooKeeperServer zkServer; + private static HiveServer2 hiveServer; + public static Configuration configuration; + public static HoodieHiveConfiguration hDroneConfiguration; + private static DateTimeFormatter dtfOut; + public static final String CSV_DELIMITER = "|"; + private static FileSystem fileSystem; + private static Set createdTablesSet = Sets.newHashSet(); + + public static void setUp() throws IOException, InterruptedException { + if (dfsCluster == null) { + HdfsTestService service = new HdfsTestService(); + dfsCluster = service.start(true); + configuration = service.getHadoopConf(); + } + if (zkServer == null) { + ZookeeperTestService zkService = new ZookeeperTestService(configuration); + zkServer = zkService.start(); + } + if (hiveServer == null) { + HiveTestService hiveService = new HiveTestService(configuration); + hiveServer = hiveService.start(); + } + hDroneConfiguration = + HoodieHiveConfiguration.newBuilder().hiveJdbcUrl("jdbc:hive2://127.0.0.1:9999/") + .hivedb("hdrone_test").jdbcUsername("").jdbcPassword("") + .hadoopConfiguration(hiveServer.getHiveConf()).build(); + dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd"); + + HoodieHiveClient client = new HoodieHiveClient(hDroneConfiguration); + for (String tableName : createdTablesSet) { + client.updateHiveSQL("drop table if exists " + tableName); + } + createdTablesSet.clear(); + client.updateHiveSQL( + "drop database if exists " + hDroneConfiguration.getDbName()); + client.updateHiveSQL("create database " + hDroneConfiguration.getDbName()); + + fileSystem = FileSystem.get(configuration); + } + + public static void shutdown() { + if (hiveServer != null) { + hiveServer.stop(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + if (zkServer != null) { + zkServer.shutdown(); + } + } + + public static HoodieDatasetReference createDataset(String tableName, String hdfsPath, int numberOfPartitions, + String schemaFile) throws IOException, InitializationError { + Path path = new Path(hdfsPath); + FileUtils.deleteDirectory(new File(hdfsPath)); + + boolean result = fileSystem.mkdirs(path); + checkResult(result); + HoodieDatasetReference metadata = + new HoodieDatasetReference(tableName, path.toString(), + hDroneConfiguration.getDbName()); + DateTime dateTime = DateTime.now(); + createPartitions(metadata, numberOfPartitions, schemaFile, dateTime, 1); + createdTablesSet.add(metadata.getDatabaseTableName()); + return metadata; + } + + private static void createPartitions(HoodieDatasetReference metadata, int numberOfPartitions, + String schemaFile, DateTime startFrom, int schemaVersion) throws IOException { + startFrom = startFrom.withTimeAtStartOfDay(); + + for (int i = 0; i < numberOfPartitions; i++) { + Path partPath = new Path(metadata.getBaseDatasetPath() + "/" + dtfOut.print(startFrom)); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + createTestData(partPath, schemaFile, schemaVersion); + startFrom = startFrom.minusDays(1); + } + } + + private static void createTestData(Path partPath, String schemaFile, int schemaVersion) + throws IOException { + for (int i = 0; i < 5; i++) { + // Create 5 files + Path filePath = + new Path(partPath.toString() + "/" + getParquetFilePath(schemaVersion, i)); + generateParquetData(filePath, schemaFile); + } + } + + private static String getParquetFilePath(int version, int iteration) { + return "test.topic.name@sjc1@SV_" + version + "@" + iteration + ".parquet"; + } + + public static MessageType readSchema(String schemaFile) throws IOException { + return MessageTypeParser + .parseMessageType(IOUtils.toString(TestUtil.class.getResourceAsStream(schemaFile))); + } + + public static void generateParquetData(Path filePath, String schemaFile) throws IOException { + MessageType schema = readSchema(schemaFile); + CsvParquetWriter writer = new CsvParquetWriter(filePath, schema); + + BufferedReader br = new BufferedReader( + new InputStreamReader(TestUtil.class.getResourceAsStream(getDataFile(schemaFile)))); + String line; + try { + while ((line = br.readLine()) != null) { + String[] fields = line.split(Pattern.quote(CSV_DELIMITER)); + writer.write(Arrays.asList(fields)); + } + writer.close(); + } finally { + br.close(); + } + + InputStreamReader io = null; + FSDataOutputStream hdfsPath = null; + try { + io = new FileReader(filePath.toString()); + hdfsPath = fileSystem.create(filePath); + IOUtils.copy(io, hdfsPath); + } finally { + if (io != null) { + io.close(); + } + if (hdfsPath != null) { + hdfsPath.close(); + } + } + } + + private static String getDataFile(String schemaFile) { + return schemaFile.replaceAll(".schema", ".csv"); + } + + private static void checkResult(boolean result) throws InitializationError { + if (!result) { + throw new InitializationError("Could not initialize"); + } + } + + public static void evolveDataset(HoodieDatasetReference metadata, int newPartitionCount, + String newSchema, Long startFrom, int schemaVersion) throws IOException { + createPartitions(metadata, newPartitionCount, newSchema, + new DateTime(startFrom).plusDays(newPartitionCount + 1), schemaVersion); + } +} diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java new file mode 100644 index 000000000..94a2cb425 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + +import com.google.common.base.Preconditions; +import com.google.common.io.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.InetSocketAddress; +import java.net.Socket; + +/** + * A Zookeeper minicluster service implementation. + *

+ * This class was ripped from MiniZooKeeperCluster from the HBase tests. Changes + * made include: + *

+ * 1. It will now only launch 1 zookeeper server. + *

+ * 2. It will only attempt to bind to the port specified, and will fail if it + * can't. + *

+ * 3. The startup method now takes a bindAddress, which allows us to configure + * which IP the ZK server binds to. This was not configurable in the original + * class. + *

+ * 4. The ZK cluster will re-use a data dir on the local filesystem if it + * already exists instead of blowing it away. + */ +public class ZookeeperTestService { + + private static final Logger logger = LoggerFactory.getLogger(ZookeeperTestService.class); + + private static final int TICK_TIME = 2000; + private static final int CONNECTION_TIMEOUT = 30000; + + /** + * Configuration settings + */ + private Configuration hadoopConf; + private String workDir; + private Integer clientPort = 2828; + private String bindIP = "127.0.0.1"; + private Boolean clean = false; + private int tickTime = 0; + + /** + * Embedded ZooKeeper cluster + */ + private NIOServerCnxnFactory standaloneServerFactory; + private ZooKeeperServer zooKeeperServer; + private boolean started = false; + + public ZookeeperTestService(Configuration config) { + this.workDir = Files.createTempDir().getAbsolutePath(); + this.hadoopConf = config; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public ZooKeeperServer start() throws IOException, InterruptedException { + Preconditions.checkState(workDir != null, + "The localBaseFsLocation must be set before starting cluster."); + + setupTestEnv(); + stop(); + + File dir = new File(workDir, "zookeeper").getAbsoluteFile(); + recreateDir(dir, clean); + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + this.zooKeeperServer = new ZooKeeperServer(dir, dir, tickTimeToUse); + standaloneServerFactory = new NIOServerCnxnFactory(); + + // NOTE: Changed from the original, where InetSocketAddress was + // originally created to bind to the wildcard IP, we now configure it. + logger.info("Zookeeper force binding to: " + this.bindIP); + standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000); + + // Start up this ZK server + standaloneServerFactory.startup(zooKeeperServer); + + String serverHostname; + if (bindIP.equals("0.0.0.0")) { + serverHostname = "localhost"; + } else { + serverHostname = bindIP; + } + if (!waitForServerUp(serverHostname, clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for startup of standalone server"); + } + + started = true; + logger.info("Zookeeper Minicluster service started on client port: " + clientPort); + return zooKeeperServer; + } + + public void stop() throws IOException { + if (!started) { + return; + } + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + // clear everything + started = false; + standaloneServerFactory = null; + zooKeeperServer = null; + + logger.info("Zookeeper Minicluster service shut down."); + } + + private void recreateDir(File dir, boolean clean) throws IOException { + if (dir.exists() && clean) { + FileUtil.fullyDelete(dir); + } else if (dir.exists() && !clean) { + // the directory's exist, and we don't want to clean, so exit + return; + } + try { + dir.mkdirs(); + } catch (SecurityException e) { + throw new IOException("creating dir: " + dir, e); + } + } + + // / XXX: From o.a.zk.t.ClientBase + private static void setupTestEnv() { + // during the tests we run with 100K prealloc in the logs. + // on windows systems prealloc of 64M was seen to take ~15seconds + // resulting in test failure (client timeout on first session). + // set env and directly in order to handle static init/gc issues + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100 * 1024); + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerDown(int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + } finally { + sock.close(); + } + } catch (IOException e) { + return true; + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerUp(String hostname, int port, long timeout) { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket(hostname, port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + + Reader isr = new InputStreamReader(sock.getInputStream()); + reader = new BufferedReader(isr); + String line = reader.readLine(); + if (line != null && line.startsWith("Zookeeper version:")) { + return true; + } + } finally { + sock.close(); + if (reader != null) { + reader.close(); + } + } + } catch (IOException e) { + // ignore as this is expected + logger.info("server " + hostname + ":" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + // ignore + } + } + return false; + } +} diff --git a/hoodie-hive/src/test/resources/nation.csv b/hoodie-hive/src/test/resources/nation.csv new file mode 100644 index 000000000..ee71b02ea --- /dev/null +++ b/hoodie-hive/src/test/resources/nation.csv @@ -0,0 +1,25 @@ +0|ALGERIA|0| haggle. carefully final deposits detect slyly agai +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5|ETHIOPIA|0|ven packages wake quickly. regu +6|FRANCE|3|refully final requests. regular, ironi +7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco +8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun +9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull +10|IRAN|4|efully alongside of the slyly final dependencies. +11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula +12|JAPAN|2|ously. final, express gifts cajole a +13|JORDAN|4|ic deposits are blithely about the carefully regular pa +14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t +15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets? +16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r +17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun +18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos +19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account +20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely +21|VIETNAM|2|hely enticingly express accounts. even, final +22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint +23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull +24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be diff --git a/hoodie-hive/src/test/resources/nation.schema b/hoodie-hive/src/test/resources/nation.schema new file mode 100644 index 000000000..8b57cb478 --- /dev/null +++ b/hoodie-hive/src/test/resources/nation.schema @@ -0,0 +1,6 @@ +message m { + required int32 nation_key; + required binary name; + required int32 region_key; + required binary comment_col; +} diff --git a/hoodie-hive/src/test/resources/nation_evolved.csv b/hoodie-hive/src/test/resources/nation_evolved.csv new file mode 100644 index 000000000..6dc8ce4dc --- /dev/null +++ b/hoodie-hive/src/test/resources/nation_evolved.csv @@ -0,0 +1,25 @@ +0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|desc0 +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon|desc1 +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special |desc2 +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold|desc3 +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d|desc4 +5|ETHIOPIA|0|ven packages wake quickly. regu|desc5 +6|FRANCE|3|refully final requests. regular, ironi|desc6 +7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco|desc7 +8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun|desc8 +9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull|desc9 +10|IRAN|4|efully alongside of the slyly final dependencies. |desc10 +11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula|desc11 +12|JAPAN|2|ously. final, express gifts cajole a|desc12 +13|JORDAN|4|ic deposits are blithely about the carefully regular pa|desc13 +14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t|desc14 +15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?|desc15 +16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r|desc16 +17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun|desc17 +18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos|desc18 +19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account|desc19 +20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely|desc20 +21|VIETNAM|2|hely enticingly express accounts. even, final |desc21 +22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint|desc22 +23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull|desc23 +24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be|desc24 diff --git a/hoodie-hive/src/test/resources/nation_evolved.schema b/hoodie-hive/src/test/resources/nation_evolved.schema new file mode 100644 index 000000000..b395c27ac --- /dev/null +++ b/hoodie-hive/src/test/resources/nation_evolved.schema @@ -0,0 +1,7 @@ +message m { + required int32 nation_key; + required binary name; + required int64 region_key; + required binary comment_col; + optional binary desc; +} diff --git a/pom.xml b/pom.xml index 4aabe6def..8553fb378 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,7 @@ hoodie-client hoodie-cli hoodie-hadoop-mr + hoodie-hive @@ -173,6 +174,9 @@ **/.* **/*.txt **/*.sh + **/test/resources/*.avro + **/test/resources/*.schema + **/test/resources/*.csv @@ -360,6 +364,12 @@ slf4j-api 1.7.5 + + org.slf4j + slf4j-log4j12 + 1.7.5 + + org.apache.commons commons-configuration2 @@ -380,9 +390,43 @@ org.apache.hive hive-jdbc - 1.1.0-cdh5.7.2 + ${hive.version}-cdh${cdh.version} + + org.apache.hive + hive-service + ${hive.version}-cdh${cdh.version} + + + org.apache.hive + hive-metastore + ${hive.version}-cdh${cdh.version} + + + + junit + junit + 4.12 + + + org.apache.hadoop + hadoop-hdfs + tests + ${hadoop.version}-cdh${cdh.version} + + + org.apache.hadoop + hadoop-common + tests + ${hadoop.version}-cdh${cdh.version} + + + org.mockito + mockito-all + test + 1.10.19 +