diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml
index 59137bf04..d0bce106e 100644
--- a/hoodie-hadoop-mr/pom.xml
+++ b/hoodie-hadoop-mr/pom.xml
@@ -1,4 +1,20 @@
+
+
@@ -66,4 +82,14 @@
avro
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java
index ee38cb2d7..b1666d4c6 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieHiveUtil.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
index 5e81c6231..29a3dd9fa 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
index 3ca847f6a..997c91f22 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index d2e5e3edb..6a016c4a6 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml
new file mode 100644
index 000000000..153eeb1e1
--- /dev/null
+++ b/hoodie-hive/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+
+
+ hoodie
+ com.uber.hoodie
+ 0.2.5-SNAPSHOT
+
+ 4.0.0
+
+ hoodie-hive
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hive
+ hive-common
+
+
+ org.apache.hive
+ hive-jdbc
+
+
+ com.google.guava
+ guava
+
+
+ org.apache.hive
+ hive-service
+
+
+ org.apache.hive
+ hive-metastore
+
+
+ org.apache.thrift
+ libthrift
+ 0.9.2
+
+
+
+ commons-dbcp
+ commons-dbcp
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+ junit
+ junit
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ tests
+
+
+ org.apache.hadoop
+ hadoop-common
+ tests
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+
+ com.uber.hoodie
+ hoodie-hadoop-mr
+ ${project.version}
+
+
+ com.uber.hoodie
+ hoodie-common
+ ${project.version}
+
+
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java
new file mode 100644
index 000000000..34c49d31a
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveConfiguration.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configurations for registering a hoodie dataset into hive metastore
+ */
+public class HoodieHiveConfiguration {
+ private final String hiveJdbcUrl;
+ private final String dbName;
+ private final String hiveUsername;
+ private final String hivePassword;
+ private final Configuration configuration;
+
+ private HoodieHiveConfiguration(String hiveJdbcUrl, String defaultDatabaseName,
+ String hiveUsername, String hivePassword, Configuration configuration) {
+ this.hiveJdbcUrl = hiveJdbcUrl;
+ this.dbName = defaultDatabaseName;
+ this.hiveUsername = hiveUsername;
+ this.hivePassword = hivePassword;
+ this.configuration = configuration;
+ }
+
+ public String getHiveJdbcUrl() {
+ return hiveJdbcUrl;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getHiveUsername() {
+ return hiveUsername;
+ }
+
+ public String getHivePassword() {
+ return hivePassword;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("HoodieHiveConfiguration{");
+ sb.append("hiveJdbcUrl='").append(hiveJdbcUrl).append('\'');
+ sb.append(", dbName='").append(dbName).append('\'');
+ sb.append(", hiveUsername='").append(hiveUsername).append('\'');
+ sb.append(", hivePassword='").append(hivePassword).append('\'');
+ sb.append(", configuration=").append(configuration);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private static Logger LOG = LoggerFactory.getLogger(Builder.class);
+ private String hiveJdbcUrl;
+ private String dbName;
+ private String jdbcUsername;
+ private String jdbcPassword;
+ private Configuration configuration;
+
+ public Builder hiveJdbcUrl(String hiveJdbcUrl) {
+ this.hiveJdbcUrl = hiveJdbcUrl;
+ return this;
+ }
+
+ public Builder hivedb(String hiveDatabase) {
+ this.dbName = hiveDatabase;
+ return this;
+ }
+
+ public Builder jdbcUsername(String jdbcUsername) {
+ this.jdbcUsername = jdbcUsername;
+ return this;
+ }
+
+ public Builder jdbcPassword(String jdbcPassword) {
+ this.jdbcPassword = jdbcPassword;
+ return this;
+ }
+
+ public Builder hadoopConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public HoodieHiveConfiguration build() {
+ HoodieHiveConfiguration config =
+ new HoodieHiveConfiguration(hiveJdbcUrl, dbName, jdbcUsername, jdbcPassword,
+ configuration);
+ LOG.info("Hoodie Hive Configuration - " + config);
+ return config;
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java
new file mode 100644
index 000000000..4dc06e645
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetException.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+public class HoodieHiveDatasetException extends RuntimeException {
+
+ public HoodieHiveDatasetException() {
+ super();
+ }
+
+ public HoodieHiveDatasetException(String message) {
+ super(message);
+ }
+
+ public HoodieHiveDatasetException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public HoodieHiveDatasetException(Throwable t) {
+ super(t);
+ }
+
+ protected static String format(String message, Object... args) {
+ return String.format(String.valueOf(message), (Object[]) args);
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java
new file mode 100644
index 000000000..184917a1c
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.client.HoodieHiveClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.StoragePartition;
+import com.uber.hoodie.hive.model.TablePartition;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Represents a Hive External Dataset.
+ * Contains metadata for storage and table partitions.
+ */
+public class HoodieHiveDatasetSyncTask {
+ private static Logger LOG = LoggerFactory.getLogger(HoodieHiveDatasetSyncTask.class);
+ private final HoodieHiveSchemaSyncTask schemaSyncTask;
+ private final List newPartitions;
+ private final List changedPartitions;
+
+ public HoodieHiveDatasetSyncTask(HoodieHiveSchemaSyncTask schemaSyncTask,
+ List newPartitions, List changedPartitions) {
+ this.schemaSyncTask = schemaSyncTask;
+ this.newPartitions = ImmutableList.copyOf(newPartitions);
+ this.changedPartitions = ImmutableList.copyOf(changedPartitions);
+ }
+
+ public HoodieHiveSchemaSyncTask getSchemaSyncTask() {
+ return schemaSyncTask;
+ }
+
+ public List getNewPartitions() {
+ return newPartitions;
+ }
+
+ public List getChangedPartitions() {
+ return changedPartitions;
+ }
+
+ /**
+ * Sync this dataset
+ * 1. If any schema difference is found, then sync the table schema
+ * 2. If any new partitions are found, adds partitions to the table (which uses the table schema by default)
+ * 3. If any partition path has changed, modify the partition to the new path (which does not change the partition schema)
+ */
+ public void sync() {
+ LOG.info("Starting Sync for " + schemaSyncTask.getReference());
+ try {
+ // First sync the table schema
+ schemaSyncTask.sync();
+
+ // Add all the new partitions
+ schemaSyncTask.getHiveClient()
+ .addPartitionsToTable(schemaSyncTask.getReference(), newPartitions,
+ schemaSyncTask.getPartitionStrategy());
+ // Update all the changed partitions
+ schemaSyncTask.getHiveClient()
+ .updatePartitionsToTable(schemaSyncTask.getReference(), changedPartitions,
+ schemaSyncTask.getPartitionStrategy());
+ } catch (Exception e) {
+ throw new HoodieHiveDatasetException(
+ "Failed to sync dataset " + schemaSyncTask.getReference(), e);
+ }
+ LOG.info("Sync for " + schemaSyncTask.getReference() + " complete.");
+ }
+
+ public static Builder newBuilder(HoodieHiveDatasetSyncTask dataset) {
+ return newBuilder().withConfiguration(dataset.schemaSyncTask.getConf())
+ .withReference(dataset.schemaSyncTask.getReference())
+ .withFSClient(dataset.schemaSyncTask.getFsClient())
+ .withHiveClient(dataset.schemaSyncTask.getHiveClient())
+ .schemaStrategy(dataset.schemaSyncTask.getSchemaStrategy())
+ .partitionStrategy(dataset.schemaSyncTask.getPartitionStrategy());
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private static Logger LOG = LoggerFactory.getLogger(Builder.class);
+ private HoodieHiveConfiguration configuration;
+ private HoodieDatasetReference datasetReference;
+ private SchemaStrategy schemaStrategy;
+ private PartitionStrategy partitionStrategy;
+ private HoodieHiveClient hiveClient;
+ private HoodieFSClient fsClient;
+
+ public Builder withReference(HoodieDatasetReference reference) {
+ this.datasetReference = reference;
+ return this;
+ }
+
+ public Builder withConfiguration(HoodieHiveConfiguration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder schemaStrategy(SchemaStrategy schemaStrategy) {
+ this.schemaStrategy = schemaStrategy;
+ return this;
+ }
+
+ public Builder partitionStrategy(PartitionStrategy partitionStrategy) {
+ if(partitionStrategy != null) {
+ LOG.info("Partitioning the dataset with keys " + ArrayUtils
+ .toString(partitionStrategy.getHivePartitionFieldNames()));
+ }
+ this.partitionStrategy = partitionStrategy;
+ return this;
+ }
+
+ public Builder withHiveClient(HoodieHiveClient hiveClient) {
+ this.hiveClient = hiveClient;
+ return this;
+ }
+
+ public Builder withFSClient(HoodieFSClient fsClient) {
+ this.fsClient = fsClient;
+ return this;
+ }
+
+ public HoodieHiveDatasetSyncTask build() {
+ LOG.info("Building dataset for " + datasetReference);
+ HoodieHiveSchemaSyncTask schemaSyncTask =
+ HoodieHiveSchemaSyncTask.newBuilder().withReference(datasetReference)
+ .withConfiguration(configuration).schemaStrategy(schemaStrategy)
+ .partitionStrategy(partitionStrategy).withHiveClient(hiveClient)
+ .withFSClient(fsClient).build();
+
+ List storagePartitions = Lists.newArrayList();
+ FileStatus[] storagePartitionPaths = schemaSyncTask.getPartitionStrategy()
+ .scanAllPartitions(schemaSyncTask.getReference(), schemaSyncTask.getFsClient());
+ for (FileStatus fileStatus : storagePartitionPaths) {
+ storagePartitions.add(new StoragePartition(schemaSyncTask.getReference(),
+ schemaSyncTask.getPartitionStrategy(), fileStatus));
+ }
+ LOG.info("Storage partitions scan complete. Found " + storagePartitions.size());
+
+ List newPartitions;
+ List changedPartitions;
+
+ // Check if table exists
+ if (schemaSyncTask.getHiveClient().checkTableExists(schemaSyncTask.getReference())) {
+ List partitions =
+ schemaSyncTask.getHiveClient().scanPartitions(schemaSyncTask.getReference());
+ LOG.info("Table partition scan complete. Found " + partitions.size());
+ newPartitions = schemaSyncTask.getFsClient()
+ .getUnregisteredStoragePartitions(partitions, storagePartitions);
+ changedPartitions = schemaSyncTask.getFsClient()
+ .getChangedStoragePartitions(partitions, storagePartitions);
+ } else {
+ newPartitions = storagePartitions;
+ changedPartitions = Lists.newArrayList();
+ }
+ return new HoodieHiveDatasetSyncTask(schemaSyncTask, newPartitions, changedPartitions);
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java
new file mode 100644
index 000000000..ffabcd3ea
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveSchemaSyncTask.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import com.uber.hoodie.hadoop.HoodieInputFormat;
+import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy;
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.client.HoodieHiveClient;
+import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy;
+import com.uber.hoodie.hive.client.SchemaUtil;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.SchemaDifference;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * Represents the Schema sync task for the dataset.
+ * Execute sync() on this task to sync up the HDFS dataset schema and hive table schema
+ */
+public class HoodieHiveSchemaSyncTask {
+ private static Logger LOG = LoggerFactory.getLogger(HoodieHiveSchemaSyncTask.class);
+
+ private static final String DEFAULT_INPUTFORMAT = HoodieInputFormat.class.getName();
+ private static final String DEFAULT_OUTPUTFORMAT = MapredParquetOutputFormat.class.getName();
+
+ private final HoodieDatasetReference reference;
+ private final MessageType storageSchema;
+ private final Map tableSchema;
+ private final PartitionStrategy partitionStrategy;
+ private final SchemaStrategy schemaStrategy;
+ private final HoodieHiveClient hiveClient;
+ private final HoodieHiveConfiguration conf;
+ private final HoodieFSClient fsClient;
+
+ public HoodieHiveSchemaSyncTask(HoodieDatasetReference datasetReference,
+ MessageType schemaInferred, Map fieldsSchema,
+ PartitionStrategy partitionStrategy, SchemaStrategy schemaStrategy,
+ HoodieHiveConfiguration configuration, HoodieHiveClient hiveClient,
+ HoodieFSClient fsClient) {
+ this.reference = datasetReference;
+ this.storageSchema = schemaInferred;
+ this.tableSchema = fieldsSchema;
+ this.partitionStrategy = partitionStrategy;
+ this.schemaStrategy = schemaStrategy;
+ this.hiveClient = hiveClient;
+ this.conf = configuration;
+ this.fsClient = fsClient;
+ }
+
+ public SchemaDifference getSchemaDifference() {
+ return SchemaUtil.getSchemaDifference(storageSchema, tableSchema,
+ partitionStrategy.getHivePartitionFieldNames());
+ }
+
+ /**
+ * Checks if the table schema is present. If not, creates one.
+ * If already exists, computes the schema difference and if there is any difference
+ * it generates a alter table and syncs up the schema to hive metastore.
+ */
+ public void sync() {
+ try {
+ // Check if the table needs to be created
+ if (tableSchema.isEmpty()) {
+ // create the database
+ LOG.info("Schema not found. Creating for " + reference);
+ hiveClient.createTable(storageSchema, reference,
+ partitionStrategy.getHivePartitionFieldNames(), DEFAULT_INPUTFORMAT,
+ DEFAULT_OUTPUTFORMAT);
+ } else {
+ if (!getSchemaDifference().isEmpty()) {
+ LOG.info("Schema sync required for " + reference);
+ hiveClient.updateTableDefinition(reference,
+ partitionStrategy.getHivePartitionFieldNames(), storageSchema);
+ } else {
+ LOG.info("Schema sync not required for " + reference);
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieHiveDatasetException("Failed to sync dataset " + reference,
+ e);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public MessageType getStorageSchema() {
+ return storageSchema;
+ }
+
+ public Map getTableSchema() {
+ return tableSchema;
+ }
+
+ public PartitionStrategy getPartitionStrategy() {
+ return partitionStrategy;
+ }
+
+ public SchemaStrategy getSchemaStrategy() {
+ return schemaStrategy;
+ }
+
+ public HoodieHiveClient getHiveClient() {
+ return hiveClient;
+ }
+
+ public HoodieHiveConfiguration getConf() {
+ return conf;
+ }
+
+ public HoodieDatasetReference getReference() {
+ return reference;
+ }
+
+ public HoodieFSClient getFsClient() {
+ return fsClient;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ HoodieHiveSchemaSyncTask that = (HoodieHiveSchemaSyncTask) o;
+ return Objects.equal(storageSchema, that.storageSchema) && Objects
+ .equal(tableSchema, that.tableSchema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(storageSchema, tableSchema);
+ }
+
+ public static class Builder {
+ private static Logger LOG = LoggerFactory.getLogger(Builder.class);
+ private HoodieHiveConfiguration configuration;
+ private HoodieDatasetReference datasetReference;
+ private SchemaStrategy schemaStrategy;
+ private PartitionStrategy partitionStrategy;
+ private HoodieHiveClient hiveClient;
+ private HoodieFSClient fsClient;
+
+ public Builder withReference(HoodieDatasetReference reference) {
+ this.datasetReference = reference;
+ return this;
+ }
+
+ public Builder withConfiguration(HoodieHiveConfiguration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public Builder schemaStrategy(SchemaStrategy schemaStrategy) {
+ this.schemaStrategy = schemaStrategy;
+ return this;
+ }
+
+ public Builder partitionStrategy(PartitionStrategy partitionStrategy) {
+ if(partitionStrategy != null) {
+ LOG.info("Partitioning the dataset with keys " + ArrayUtils
+ .toString(partitionStrategy.getHivePartitionFieldNames()));
+ }
+ this.partitionStrategy = partitionStrategy;
+ return this;
+ }
+
+ public Builder withHiveClient(HoodieHiveClient hiveClient) {
+ this.hiveClient = hiveClient;
+ return this;
+ }
+
+ public Builder withFSClient(HoodieFSClient fsClient) {
+ this.fsClient = fsClient;
+ return this;
+ }
+
+ public HoodieHiveSchemaSyncTask build() {
+ LOG.info("Building dataset schema for " + datasetReference);
+ createDefaults();
+
+ MessageType schemaInferred =
+ schemaStrategy.getDatasetSchema(datasetReference, fsClient);
+ LOG.info("Storage Schema inferred for dataset " + datasetReference);
+ LOG.debug("Inferred Storage Schema " + schemaInferred);
+
+ Map fieldsSchema;
+ if (!hiveClient.checkTableExists(datasetReference)) {
+ fieldsSchema = Maps.newHashMap();
+ } else {
+ fieldsSchema = hiveClient.getTableSchema(datasetReference);
+ }
+ LOG.info("Table Schema inferred for dataset " + datasetReference);
+ LOG.debug("Inferred Table Schema " + fieldsSchema);
+
+ return new HoodieHiveSchemaSyncTask(datasetReference, schemaInferred, fieldsSchema,
+ partitionStrategy, schemaStrategy, configuration, hiveClient, fsClient);
+ }
+
+ private void createDefaults() {
+ if (partitionStrategy == null) {
+ LOG.info("Partition strategy is not set. Selecting the default strategy");
+ partitionStrategy = new DayBasedPartitionStrategy();
+ }
+ if (schemaStrategy == null) {
+ LOG.info(
+ "Schema strategy not specified. Selecting the default based on the dataset type");
+ schemaStrategy = new ParseSchemaFromDataStrategy();
+ }
+ if (fsClient == null) {
+ LOG.info("Creating a new FS Client as none has been passed in");
+ fsClient = new HoodieFSClient(configuration);
+ }
+ if (hiveClient == null) {
+ LOG.info("Creating a new Hive Client as none has been passed in");
+ hiveClient = new HoodieHiveClient(configuration);
+ }
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java
new file mode 100644
index 000000000..e48023b34
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Abstraction to define HDFS partition strategies.
+ * Strategy provides hookups to map partitions on to physical layout
+ *
+ * @see SchemaStrategy
+ */
+public interface PartitionStrategy {
+ /**
+ * Scans the file system for all partitions and returns FileStatus[] which are the available partitions
+ *
+ * @param basePath
+ * @param fsClient
+ * @return
+ */
+ FileStatus[] scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient);
+
+ /**
+ * Get the list of hive field names the dataset will be partitioned on.
+ * The field name should be present in the storage schema.
+ *
+ * @return List of partitions field names
+ */
+ String[] getHivePartitionFieldNames();
+
+ /**
+ * Convert a Partition path (returned in scanAllPartitions) to values for column names returned in getHivePartitionFieldNames
+ * e.g. /data/topic/2016/12/12/ will return [2016, 12, 12]
+ *
+ * @param partition storage path
+ * @return List of partitions field values
+ */
+ String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition);
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java
new file mode 100644
index 000000000..c6d7a38ff
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/SchemaStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import parquet.schema.MessageType;
+
+/**
+ * Abstraction to get the Parquet schema for a {@link HoodieDatasetReference}
+ * If you are managing the schemas externally, connect to the system and get the schema.
+ *
+ * @see PartitionStrategy
+ */
+public interface SchemaStrategy {
+ MessageType getDatasetSchema(HoodieDatasetReference metadata, HoodieFSClient fsClient);
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java
new file mode 100644
index 000000000..977133963
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/ColumnNameXLator.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.client;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class ColumnNameXLator {
+ private static Map xformMap = Maps.newHashMap();
+
+ public static String translateNestedColumn(String colName) {
+ Map.Entry entry;
+ for (Iterator i$ = xformMap.entrySet().iterator(); i$.hasNext();
+ colName = colName.replaceAll((String) entry.getKey(), (String) entry.getValue())) {
+ entry = (Map.Entry) i$.next();
+ }
+
+ return colName;
+ }
+
+ public static String translateColumn(String colName) {
+ return colName;
+ }
+
+ public static String translate(String colName, boolean nestedColumn) {
+ return !nestedColumn ? translateColumn(colName) : translateNestedColumn(colName);
+ }
+
+ static {
+ xformMap.put("\\$", "_dollar_");
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java
new file mode 100644
index 000000000..0436d5285
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.client;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.uber.hoodie.hive.HoodieHiveConfiguration;
+import com.uber.hoodie.hive.HoodieHiveDatasetException;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.StoragePartition;
+import com.uber.hoodie.hive.model.TablePartition;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Client to access HDFS
+ */
+public class HoodieFSClient {
+ final public static String PARQUET_EXTENSION = ".parquet";
+ final public static String PARQUET_EXTENSION_ZIPPED = ".parquet.gz";
+ private final static Logger LOG = LoggerFactory.getLogger(HoodieFSClient.class);
+ private final HoodieHiveConfiguration conf;
+ private final FileSystem fs;
+
+ public HoodieFSClient(HoodieHiveConfiguration configuration) {
+ this.conf = configuration;
+ try {
+ this.fs = FileSystem.get(configuration.getConfiguration());
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException(
+ "Could not initialize file system from configuration", e);
+ }
+ }
+
+ /**
+ * Read the parquet schema from a parquet File
+ *
+ * @param parquetFilePath
+ * @return
+ * @throws IOException
+ */
+ public MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException {
+ LOG.info("Reading schema from " + parquetFilePath);
+
+ if (!fs.exists(parquetFilePath)) {
+ throw new IllegalArgumentException(
+ "Failed to read schema from data file " + parquetFilePath
+ + ". File does not exist.");
+ }
+ ParquetMetadata fileFooter =
+ ParquetFileReader.readFooter(conf.getConfiguration(), parquetFilePath);
+ return fileFooter.getFileMetaData().getSchema();
+ }
+
+ /**
+ * Find the last data file under the partition path.
+ *
+ * @param metadata
+ * @param partitionPathString
+ * @return
+ */
+ public Path lastDataFileForDataset(HoodieDatasetReference metadata,
+ String partitionPathString) {
+ try {
+ Path partitionPath = new Path(partitionPathString);
+ if (!fs.exists(partitionPath)) {
+ throw new HoodieHiveDatasetException(
+ "Partition path " + partitionPath + " not found in Dataset " + metadata);
+ }
+
+ RemoteIterator files = fs.listFiles(partitionPath, true);
+ // Iterate over the list. List is generally is listed in chronological order becasue of the date partitions
+ // Get the latest schema
+ Path returnPath = null;
+ while (files.hasNext()) {
+ Path path = files.next().getPath();
+ if (path.getName().endsWith(PARQUET_EXTENSION) || path.getName()
+ .endsWith(PARQUET_EXTENSION_ZIPPED)) {
+ returnPath = path;
+ }
+ }
+ if (returnPath != null) {
+ return returnPath;
+ }
+ throw new HoodieHiveDatasetException(
+ "No data file found in path " + partitionPath + " for dataset " + metadata);
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException(
+ "Failed to get data file in path " + partitionPathString + " for dataset "
+ + metadata, e);
+ }
+ }
+
+ /**
+ * Finds all the files/directories that match the pattern under the {@link HoodieDatasetReference} basePath
+ *
+ * @param metadata
+ * @param pattern
+ * @return
+ */
+ public FileStatus[] getDirectoriesMatchingPattern(HoodieDatasetReference metadata, String pattern) {
+ try {
+ Path path = new Path(metadata.getBaseDatasetPath() + pattern);
+ FileStatus[] status = fs.globStatus(path);
+ List returns = Lists.newArrayList();
+ for(FileStatus st:status) {
+ if(!st.getPath().toString().contains(".distcp")) {
+ // Ignore temporary directories created by distcp
+ returns.add(st);
+ }
+ }
+ return returns.toArray(new FileStatus[returns.size()]);
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException(
+ "IOException when reading directories under dataset " + metadata + " with pattern "
+ + pattern, e);
+ }
+ }
+
+ /**
+ * Get the list of storage partitions which does not have its equivalent hive partitions
+ *
+ * @param tablePartitions
+ * @param storagePartitions
+ * @return
+ */
+ public List getUnregisteredStoragePartitions(
+ List tablePartitions, List storagePartitions) {
+ Set paths = Sets.newHashSet();
+ for (TablePartition tablePartition : tablePartitions) {
+ paths.add(tablePartition.getLocation().toUri().getPath());
+ }
+ List missing = Lists.newArrayList();
+ for (StoragePartition storagePartition : storagePartitions) {
+ String hdfsPath = storagePartition.getPartitionPath().toUri().getPath();
+ if (!paths.contains(hdfsPath)) {
+ missing.add(storagePartition);
+ }
+ }
+ return missing;
+ }
+
+ /**
+ * Get the list of storage partitions which does not have its equivalent hive partitions
+ *
+ * @param tablePartitions
+ * @param storagePartitions
+ * @return
+ */
+ public List getChangedStoragePartitions(
+ List tablePartitions, List storagePartitions) {
+ Map paths = Maps.newHashMap();
+ for (TablePartition tablePartition : tablePartitions) {
+ String[] partitionKeyValueStr = tablePartition.getPartitionFieldValues();
+ Arrays.sort(partitionKeyValueStr);
+ paths.put(Arrays.toString(partitionKeyValueStr), tablePartition.getLocation().toUri().getPath());
+ }
+
+ List changed = Lists.newArrayList();
+ for (StoragePartition storagePartition : storagePartitions) {
+ String[] partitionKeyValues = storagePartition.getPartitionFieldValues();
+ Arrays.sort(partitionKeyValues);
+ String partitionKeyValueStr = Arrays.toString(partitionKeyValues);
+ String hdfsPath = storagePartition.getPartitionPath().toUri().getPath();
+ if (paths.containsKey(partitionKeyValueStr) && !paths.get(partitionKeyValueStr).equals(hdfsPath)) {
+ changed.add(storagePartition);
+ }
+ }
+ return changed;
+ }
+
+ public int calculateStorageHash(FileStatus[] paths) {
+ return Objects.hashCode(paths);
+ }
+
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java
new file mode 100644
index 000000000..8fafb96f4
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieHiveClient.java
@@ -0,0 +1,365 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.client;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.uber.hoodie.hive.HoodieHiveConfiguration;
+import com.uber.hoodie.hive.HoodieHiveDatasetException;
+import com.uber.hoodie.hive.PartitionStrategy;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.SchemaDifference;
+import com.uber.hoodie.hive.model.StoragePartition;
+import com.uber.hoodie.hive.model.TablePartition;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import parquet.schema.MessageType;
+
+import javax.sql.DataSource;
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Client to access Hive
+ */
+public class HoodieHiveClient implements Closeable {
+ private static Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class);
+ private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+
+ static {
+ try {
+ Class.forName(driverName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e);
+ }
+ }
+
+ private final HoodieHiveConfiguration configuration;
+ private Connection connection;
+ private HiveConf hiveConf;
+
+ public HoodieHiveClient(HoodieHiveConfiguration configuration) {
+ this.configuration = configuration;
+ this.hiveConf = new HiveConf();
+ this.hiveConf.addResource(configuration.getConfiguration());
+ try {
+ this.connection = getConnection();
+ } catch (SQLException e) {
+ throw new HoodieHiveDatasetException("Failed to connect to hive metastore ", e);
+ }
+ }
+
+ /**
+ * Scan all the partitions for the given {@link HoodieDatasetReference} with the given {@link PartitionStrategy}
+ *
+ * @param metadata
+ * @return
+ */
+ public List scanPartitions(HoodieDatasetReference metadata) {
+ if (!checkTableExists(metadata)) {
+ throw new IllegalArgumentException(
+ "Failed to scan partitions as table " + metadata.getDatabaseTableName()
+ + " does not exist");
+ }
+ List partitions = Lists.newArrayList();
+ HiveMetaStoreClient client = null;
+ try {
+ client = new HiveMetaStoreClient(hiveConf);
+ List hivePartitions = client
+ .listPartitions(metadata.getDatabaseName(), metadata.getTableName(), (short) -1);
+ for (Partition partition : hivePartitions) {
+ partitions.add(new TablePartition(metadata, partition));
+ }
+ return partitions;
+ } catch (Exception e) {
+ throw new HoodieHiveDatasetException("Failed to scan partitions for " + metadata, e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ /**
+ * Check if table exists
+ *
+ * @param metadata
+ * @return
+ */
+ public boolean checkTableExists(HoodieDatasetReference metadata) {
+ ResultSet resultSet = null;
+ try {
+ Connection conn = getConnection();
+ resultSet = conn.getMetaData()
+ .getTables(null, metadata.getDatabaseName(), metadata.getTableName(), null);
+ return resultSet.next();
+ } catch (SQLException e) {
+ throw new HoodieHiveDatasetException("Failed to check if table exists " + metadata, e);
+ } finally {
+ closeQuietly(resultSet, null);
+ }
+ }
+
+ /**
+ * Update the hive metastore pointed to by {@link HoodieDatasetReference} with the difference
+ * in schema {@link SchemaDifference}
+ *
+ * @param metadata
+ * @param hivePartitionFieldNames
+ * @param newSchema @return
+ */
+ public boolean updateTableDefinition(HoodieDatasetReference metadata,
+ String[] hivePartitionFieldNames, MessageType newSchema) {
+ try {
+ String newSchemaStr = SchemaUtil.generateSchemaString(newSchema);
+ // Cascade clause should not be present for non-partitioned tables
+ String cascadeClause = hivePartitionFieldNames.length > 0 ? " cascade" : "";
+ StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append("`")
+ .append(metadata.getDatabaseTableName()).append("`").append(" REPLACE COLUMNS(")
+ .append(newSchemaStr).append(" )").append(cascadeClause);
+ LOG.info("Creating table with " + sqlBuilder);
+ return updateHiveSQL(sqlBuilder.toString());
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException("Failed to update table for " + metadata, e);
+ }
+ }
+
+ /**
+ * Execute a update in hive metastore with this SQL
+ *
+ * @param s SQL to execute
+ * @return
+ */
+ public boolean updateHiveSQL(String s) {
+ Statement stmt = null;
+ try {
+ Connection conn = getConnection();
+ stmt = conn.createStatement();
+ LOG.info("Executing SQL " + s);
+ return stmt.execute(s);
+ } catch (SQLException e) {
+ throw new HoodieHiveDatasetException("Failed in executing SQL " + s, e);
+ } finally {
+ closeQuietly(null, stmt);
+ }
+ }
+
+ /**
+ * Get the table schema
+ *
+ * @param datasetReference
+ * @return
+ */
+ public Map getTableSchema(HoodieDatasetReference datasetReference) {
+ if (!checkTableExists(datasetReference)) {
+ throw new IllegalArgumentException(
+ "Failed to get schema as table " + datasetReference.getDatabaseTableName()
+ + " does not exist");
+ }
+ Map schema = Maps.newHashMap();
+ ResultSet result = null;
+ try {
+ Connection connection = getConnection();
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ result = databaseMetaData.getColumns(null, datasetReference.getDatabaseName(),
+ datasetReference.getTableName(), null);
+ while (result.next()) {
+ String columnName = result.getString(4);
+ String columnType = result.getString(6);
+ schema.put(columnName, columnType);
+ }
+ return schema;
+ } catch (SQLException e) {
+ throw new HoodieHiveDatasetException(
+ "Failed to get table schema for " + datasetReference, e);
+ } finally {
+ closeQuietly(result, null);
+ }
+ }
+
+ public void addPartitionsToTable(HoodieDatasetReference datasetReference,
+ List partitionsToAdd, PartitionStrategy strategy) {
+ if (partitionsToAdd.isEmpty()) {
+ LOG.info("No partitions to add for " + datasetReference);
+ return;
+ }
+ LOG.info("Adding partitions " + partitionsToAdd.size() + " to dataset " + datasetReference);
+ String sql = constructAddPartitions(datasetReference, partitionsToAdd, strategy);
+ updateHiveSQL(sql);
+ }
+
+ public void updatePartitionsToTable(HoodieDatasetReference datasetReference,
+ List changedPartitions, PartitionStrategy partitionStrategy) {
+ if (changedPartitions.isEmpty()) {
+ LOG.info("No partitions to change for " + datasetReference);
+ return;
+ }
+ LOG.info(
+ "Changing partitions " + changedPartitions.size() + " on dataset " + datasetReference);
+ List sqls =
+ constructChangePartitions(datasetReference, changedPartitions, partitionStrategy);
+ for (String sql : sqls) {
+ updateHiveSQL(sql);
+ }
+ }
+
+ public void createTable(MessageType storageSchema, HoodieDatasetReference metadata,
+ String[] partitionKeys, String inputFormatClass, String outputFormatClass) {
+ try {
+ String createSQLQuery = SchemaUtil
+ .generateCreateDDL(storageSchema, metadata, partitionKeys, inputFormatClass,
+ outputFormatClass);
+ LOG.info("Creating table with " + createSQLQuery);
+ updateHiveSQL(createSQLQuery);
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException("Failed to create table for " + metadata, e);
+ }
+ }
+
+ private static void closeQuietly(ResultSet resultSet, Statement stmt) {
+ try {
+ if (stmt != null)
+ stmt.close();
+ if (resultSet != null)
+ resultSet.close();
+ } catch (SQLException e) {
+ LOG.error("Could not close the resultset opened ", e);
+ }
+ }
+
+ private Connection getConnection() throws SQLException {
+ int count = 0;
+ int maxTries = 3;
+ if (connection == null) {
+ Configuration conf = configuration.getConfiguration();
+ DataSource ds = getDatasource();
+ LOG.info("Getting Hive Connection from Datasource " + ds);
+ while (true) {
+ try {
+ this.connection = ds.getConnection();
+ break;
+ } catch (SQLException e) {
+ if (++count == maxTries)
+ throw e;
+ }
+ }
+ }
+ return connection;
+ }
+
+ private DataSource getDatasource() {
+ BasicDataSource ds = new BasicDataSource();
+ ds.setDriverClassName(driverName);
+ ds.setUrl(getHiveJdbcUrlWithDefaultDBName());
+ ds.setUsername(configuration.getHiveUsername());
+ ds.setPassword(configuration.getHivePassword());
+ return ds;
+ }
+
+ public String getHiveJdbcUrlWithDefaultDBName() {
+ String hiveJdbcUrl = configuration.getHiveJdbcUrl();
+ String urlAppend = null;
+ // If the hive url contains addition properties like ;transportMode=http;httpPath=hs2
+ if (hiveJdbcUrl.contains(";")) {
+ urlAppend = hiveJdbcUrl.substring(hiveJdbcUrl.indexOf(";"));
+ hiveJdbcUrl = hiveJdbcUrl.substring(0, hiveJdbcUrl.indexOf(";"));
+ }
+ if (!hiveJdbcUrl.endsWith("/")) {
+ hiveJdbcUrl = hiveJdbcUrl + "/";
+ }
+ return hiveJdbcUrl + configuration.getDbName() + (urlAppend == null ? "" : urlAppend);
+ }
+
+ private static List constructChangePartitions(HoodieDatasetReference metadata,
+ List partitions, PartitionStrategy partitionStrategy) {
+ String[] partitionFieldNames = partitionStrategy.getHivePartitionFieldNames();
+
+ List changePartitions = Lists.newArrayList();
+ String alterTable = "ALTER TABLE " + metadata.getDatabaseTableName();
+ for (StoragePartition partition : partitions) {
+ StringBuilder partBuilder = new StringBuilder();
+ String[] partitionValues = partition.getPartitionFieldValues();
+ Preconditions.checkArgument(partitionFieldNames.length == partitionValues.length,
+ "Partition key parts " + Arrays.toString(partitionFieldNames)
+ + " does not match with partition values " + Arrays.toString(partitionValues)
+ + ". Check partition strategy. ");
+ for (int i = 0; i < partitionFieldNames.length; i++) {
+ partBuilder.append(partitionFieldNames[i]).append("=").append("'")
+ .append(partitionValues[i]).append("'");
+ }
+ String changePartition =
+ alterTable + " PARTITION (" + partBuilder.toString() + ") SET LOCATION '"
+ + "hdfs://nameservice1" + partition.getPartitionPath() + "'";
+ changePartitions.add(changePartition);
+ }
+ return changePartitions;
+ }
+
+ private static String constructAddPartitions(HoodieDatasetReference metadata,
+ List partitions, PartitionStrategy partitionStrategy) {
+ return constructAddPartitions(metadata.getDatabaseTableName(), partitions,
+ partitionStrategy);
+ }
+
+ private static String constructAddPartitions(String newDbTableName,
+ List partitions, PartitionStrategy partitionStrategy) {
+ String[] partitionFieldNames = partitionStrategy.getHivePartitionFieldNames();
+ StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
+ alterSQL.append(newDbTableName).append(" ADD IF NOT EXISTS ");
+ for (StoragePartition partition : partitions) {
+ StringBuilder partBuilder = new StringBuilder();
+ String[] partitionValues = partition.getPartitionFieldValues();
+ Preconditions.checkArgument(partitionFieldNames.length == partitionValues.length,
+ "Partition key parts " + Arrays.toString(partitionFieldNames)
+ + " does not match with partition values " + Arrays.toString(partitionValues)
+ + ". Check partition strategy. ");
+ for (int i = 0; i < partitionFieldNames.length; i++) {
+ partBuilder.append(partitionFieldNames[i]).append("=").append("'")
+ .append(partitionValues[i]).append("'");
+ }
+ alterSQL.append(" PARTITION (").append(partBuilder.toString()).append(") LOCATION '")
+ .append(partition.getPartitionPath()).append("' ");
+ }
+
+ return alterSQL.toString();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Could not close the connection opened ", e);
+ }
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java
new file mode 100644
index 000000000..e11aeaf99
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/SchemaUtil.java
@@ -0,0 +1,436 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.client;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.uber.hoodie.hive.HoodieHiveDatasetException;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.SchemaDifference;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Schema Utilities
+ */
+public class SchemaUtil {
+ private static Logger LOG = LoggerFactory.getLogger(SchemaUtil.class);
+
+ /**
+ * Get the schema difference between the storage schema and hive table schema
+ *
+ * @param storageSchema
+ * @param tableSchema
+ * @param partitionKeys
+ * @return
+ */
+ public static SchemaDifference getSchemaDifference(MessageType storageSchema,
+ Map tableSchema, String[] partitionKeys) {
+ Map newTableSchema;
+ try {
+ newTableSchema = convertParquetSchemaToHiveSchema(storageSchema);
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException("Failed to convert parquet schema to hive schema",
+ e);
+ }
+ LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
+ SchemaDifference.Builder schemaDiffBuilder =
+ SchemaDifference.newBuilder(storageSchema, tableSchema);
+ Set tableColumns = Sets.newHashSet();
+
+ for (Map.Entry field : tableSchema.entrySet()) {
+ String fieldName = field.getKey().toLowerCase();
+ String tickSurroundedFieldName = tickSurround(fieldName);
+ if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName) && !ArrayUtils
+ .contains(partitionKeys, fieldName)) {
+ schemaDiffBuilder.deleteTableColumn(fieldName);
+ } else {
+ // check type
+ String tableColumnType = field.getValue();
+ if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName)) {
+ if (ArrayUtils.contains(partitionKeys, fieldName)) {
+ // Partition key does not have to be part of the storage schema
+ continue;
+ }
+ // We will log this and continue. Hive schema is a superset of all parquet schemas
+ LOG.warn("Ignoring table column " + fieldName
+ + " as its not present in the parquet schema");
+ continue;
+ }
+ tableColumnType = tableColumnType.replaceAll("\\s+", "");
+
+ String expectedType = getExpectedType(newTableSchema, tickSurroundedFieldName);
+ expectedType = expectedType.replaceAll("\\s+", "");
+ expectedType = expectedType.replaceAll("`", "");
+
+ if (!tableColumnType.equalsIgnoreCase(expectedType)) {
+ // check for incremental datasets, the schema type change is allowed as per evolution rules
+ if (!isSchemaTypeUpdateAllowed(tableColumnType, expectedType)) {
+ throw new HoodieHiveDatasetException(
+ "Could not convert field Type from " + tableColumnType + " to "
+ + expectedType + " for field " + fieldName);
+ }
+ schemaDiffBuilder.updateTableColumn(fieldName,
+ getExpectedType(newTableSchema, tickSurroundedFieldName));
+ }
+ }
+ tableColumns.add(tickSurroundedFieldName);
+ }
+
+ for (Map.Entry entry : newTableSchema.entrySet()) {
+ if (!tableColumns.contains(entry.getKey().toLowerCase())) {
+ schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
+ }
+ }
+ LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
+
+ return schemaDiffBuilder.build();
+ }
+
+ private static String getExpectedType(Map newTableSchema, String fieldName) {
+ for (Map.Entry entry : newTableSchema.entrySet()) {
+ if (entry.getKey().toLowerCase().equals(fieldName)) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
+
+ private static boolean isFieldExistsInSchema(Map newTableSchema,
+ String fieldName) {
+ for (String entry : newTableSchema.keySet()) {
+ if (entry.toLowerCase().equals(fieldName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ /**
+ * Returns equivalent Hive table schema read from a parquet file
+ *
+ * @param messageType : Parquet Schema
+ * @return : Hive Table schema read from parquet file MAP
+ * @throws IOException
+ */
+ public static Map convertParquetSchemaToHiveSchema(MessageType messageType)
+ throws IOException {
+ Map schema = Maps.newLinkedHashMap();
+ List parquetFields = messageType.getFields();
+ for (Type parquetType : parquetFields) {
+ StringBuilder result = new StringBuilder();
+ String key = parquetType.getName();
+ if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
+ result.append(createHiveArray(parquetType, ""));
+ } else {
+ result.append(convertField(parquetType));
+ }
+
+ schema.put(hiveCompatibleFieldName(key, false), result.toString());
+ }
+ return schema;
+ }
+
+ /**
+ * Convert one field data type of parquet schema into an equivalent Hive
+ * schema
+ *
+ * @param parquetType : Single paruet field
+ * @return : Equivalent sHive schema
+ */
+ private static String convertField(final Type parquetType) {
+ StringBuilder field = new StringBuilder();
+ if (parquetType.isPrimitive()) {
+ final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName =
+ parquetType.asPrimitiveType().getPrimitiveTypeName();
+ final OriginalType originalType = parquetType.getOriginalType();
+ if (originalType == OriginalType.DECIMAL) {
+ final DecimalMetadata decimalMetadata =
+ parquetType.asPrimitiveType().getDecimalMetadata();
+ return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).
+ append(" , ").append(decimalMetadata.getScale()).append(")").toString();
+ }
+ // TODO - fix the method naming here
+ return parquetPrimitiveTypeName
+ .convert(new PrimitiveType.PrimitiveTypeNameConverter() {
+ @Override
+ public String convertBOOLEAN(
+ PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "boolean";
+ }
+
+ @Override
+ public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "int";
+ }
+
+ @Override
+ public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "bigint";
+ }
+
+ @Override
+ public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "timestamp-millis";
+ }
+
+ @Override
+ public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "float";
+ }
+
+ @Override
+ public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "double";
+ }
+
+ @Override
+ public String convertFIXED_LEN_BYTE_ARRAY(
+ PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ return "binary";
+ }
+
+ @Override
+ public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) {
+ if (originalType == OriginalType.UTF8
+ || originalType == OriginalType.ENUM) {
+ return "string";
+ } else {
+ return "binary";
+ }
+ }
+ });
+ } else {
+ GroupType parquetGroupType = parquetType.asGroupType();
+ OriginalType originalType = parquetGroupType.getOriginalType();
+ if (originalType != null) {
+ switch (originalType) {
+ case LIST:
+ if (parquetGroupType.getFieldCount() != 1) {
+ throw new UnsupportedOperationException(
+ "Invalid list type " + parquetGroupType);
+ }
+ Type elementType = parquetGroupType.getType(0);
+ if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
+ throw new UnsupportedOperationException(
+ "Invalid list type " + parquetGroupType);
+ }
+ return createHiveArray(elementType, parquetGroupType.getName());
+ case MAP:
+ if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0)
+ .isPrimitive()) {
+ throw new UnsupportedOperationException(
+ "Invalid map type " + parquetGroupType);
+ }
+ GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+ if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) ||
+ !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) ||
+ mapKeyValType.getFieldCount() != 2) {
+ throw new UnsupportedOperationException(
+ "Invalid map type " + parquetGroupType);
+ }
+ Type keyType = mapKeyValType.getType(0);
+ if (!keyType.isPrimitive() ||
+ !keyType.asPrimitiveType().getPrimitiveTypeName()
+ .equals(PrimitiveType.PrimitiveTypeName.BINARY) ||
+ !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+ throw new UnsupportedOperationException(
+ "Map key type must be binary (UTF8): " + keyType);
+ }
+ Type valueType = mapKeyValType.getType(1);
+ return createHiveMap(convertField(keyType), convertField(valueType));
+ case ENUM:
+ case UTF8:
+ return "string";
+ case MAP_KEY_VALUE:
+ // MAP_KEY_VALUE was supposed to be used to annotate key and
+ // value group levels in a
+ // MAP. However, that is always implied by the structure of
+ // MAP. Hence, PARQUET-113
+ // dropped the requirement for having MAP_KEY_VALUE.
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot convert Parquet type " + parquetType);
+ }
+ } else {
+ // if no original type then it's a record
+ return createHiveStruct(parquetGroupType.getFields());
+ }
+ }
+ }
+
+ /**
+ * Return a 'struct' Hive schema from a list of Parquet fields
+ *
+ * @param parquetFields : list of parquet fields
+ * @return : Equivalent 'struct' Hive schema
+ */
+ private static String createHiveStruct(List parquetFields) {
+ StringBuilder struct = new StringBuilder();
+ struct.append("STRUCT< ");
+ for (Type field : parquetFields) {
+ //TODO: struct field name is only translated to support special char($)
+ //We will need to extend it to other collection type
+ struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : ");
+ struct.append(convertField(field)).append(", ");
+ }
+ struct.delete(struct.length() - 2, struct.length()); // Remove the last
+ // ", "
+ struct.append(">");
+ String finalStr = struct.toString();
+ // Struct cannot have - in them. userstore_udr_entities has uuid in struct. This breaks the schema.
+ // HDrone sync should not fail because of this.
+ finalStr = finalStr.replaceAll("-", "_");
+ return finalStr;
+ }
+
+
+ private static String hiveCompatibleFieldName(String fieldName, boolean isNested) {
+ String result = fieldName;
+ if (isNested) {
+ result = ColumnNameXLator.translateNestedColumn(fieldName);
+ }
+ return tickSurround(result);
+ }
+
+ private static String tickSurround(String result) {
+ if (!result.startsWith("`")) {
+ result = "`" + result;
+ }
+ if (!result.endsWith("`")) {
+ result = result + "`";
+ }
+ return result;
+ }
+
+ /**
+ * Create a 'Map' schema from Parquet map field
+ *
+ * @param keyType
+ * @param valueType
+ * @return
+ */
+ private static String createHiveMap(String keyType, String valueType) {
+ return "MAP< " + keyType + ", " + valueType + ">";
+ }
+
+ /**
+ * Create an Array Hive schema from equivalent parquet list type
+ *
+ * @param elementType
+ * @param elementName
+ * @return
+ */
+ private static String createHiveArray(Type elementType, String elementName) {
+ StringBuilder array = new StringBuilder();
+ array.append("ARRAY< ");
+ if (elementType.isPrimitive()) {
+ array.append(convertField(elementType));
+ } else {
+ final GroupType groupType = elementType.asGroupType();
+ final List groupFields = groupType.getFields();
+ if (groupFields.size() > 1 || (groupFields.size() == 1 && (
+ elementType.getName().equals("array") || elementType.getName()
+ .equals(elementName + "_tuple")))) {
+ array.append(convertField(elementType));
+ } else {
+ array.append(convertField(groupType.getFields().get(0)));
+ }
+ }
+ array.append(">");
+ return array.toString();
+ }
+
+ public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) {
+ if (prevType == null || prevType.trim().isEmpty() ||
+ newType == null || newType.trim().isEmpty()) {
+ return false;
+ }
+ prevType = prevType.toLowerCase();
+ newType = newType.toLowerCase();
+ if (prevType.equals(newType)) {
+ return true;
+ } else if (prevType.equalsIgnoreCase("int") && newType.equalsIgnoreCase("bigint")) {
+ return true;
+ } else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) {
+ return true;
+ } else if (prevType.contains("struct") && newType.toLowerCase().contains("struct")) {
+ return true;
+ }
+ return false;
+ }
+
+ public static String generateSchemaString(MessageType storageSchema) throws IOException {
+ Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
+ StringBuilder columns = new StringBuilder();
+ for (Map.Entry hiveSchemaEntry : hiveSchema.entrySet()) {
+ columns.append(hiveSchemaEntry.getKey()).append(" ");
+ columns.append(hiveSchemaEntry.getValue()).append(", ");
+ }
+ // Remove the last ", "
+ columns.delete(columns.length() - 2, columns.length());
+ return columns.toString();
+ }
+
+ public static String generateCreateDDL(MessageType storageSchema,
+ HoodieDatasetReference metadata, String[] partitionKeys, String inputFormatClass,
+ String outputFormatClass) throws IOException {
+ Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
+ String columns = generateSchemaString(storageSchema);
+
+ StringBuilder partitionFields = new StringBuilder();
+ for (String partitionKey : partitionKeys) {
+ partitionFields.append(partitionKey).append(" ")
+ .append(getPartitionKeyType(hiveSchema, partitionKey));
+ }
+
+ StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
+ sb = sb.append(metadata.getDatabaseTableName());
+ sb = sb.append("( ").append(columns).append(")");
+ if (partitionKeys.length > 0) {
+ sb = sb.append(" PARTITIONED BY (").append(partitionFields).append(")");
+ }
+ sb = sb.append(" ROW FORMAT SERDE '").append(ParquetHiveSerDe.class.getName()).append("'");
+ sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
+ sb = sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '")
+ .append(metadata.getBaseDatasetPath()).append("'");
+ return sb.toString();
+ }
+
+ private static String getPartitionKeyType(Map hiveSchema, String partitionKey) {
+ if (hiveSchema.containsKey(partitionKey)) {
+ return hiveSchema.get(partitionKey);
+ }
+ // Default the unknown partition fields to be String
+ // TODO - all partition fields should be part of the schema. datestr is treated as special. Dont do that
+ return "String";
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java
new file mode 100644
index 000000000..1b23e870c
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.example;
+
+import com.uber.hoodie.hive.HoodieHiveConfiguration;
+import com.uber.hoodie.hive.HoodieHiveDatasetSyncTask;
+import com.uber.hoodie.hive.PartitionStrategy;
+import com.uber.hoodie.hive.SchemaStrategy;
+import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy;
+import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Example showing basic usage of Hoodie Hive API
+ */
+public class HoodieDatasetExample {
+ public static void main(String[] args) {
+ // Configure to point to which metastore and database to connect to
+ HoodieHiveConfiguration apiConfig =
+ HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration())
+ .hivedb("tmp").hiveJdbcUrl("jdbc:hive2://localhost:10010/").jdbcUsername("hive")
+ .jdbcPassword("hive").build();
+
+ HoodieDatasetReference datasetReference =
+ new HoodieDatasetReference("clickstream", "hdfs:///data/tables/user.clickstream",
+ "raw");
+
+ // initialize the strategies
+ PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy();
+ SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy();
+
+ // Creates a new dataset which reflects the state at the time of creation
+ HoodieHiveDatasetSyncTask datasetSyncTask =
+ HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference)
+ .withConfiguration(apiConfig).partitionStrategy(partitionStrategy)
+ .schemaStrategy(schemaStrategy).build();
+
+ // Sync dataset
+ datasetSyncTask.sync();
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java
new file mode 100644
index 000000000..5300cb045
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.impl;
+
+import com.uber.hoodie.hive.PartitionStrategy;
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple day based partitions.
+ * Storage is of this format yyyy/mm/dd
+ * Table is partitioned by dateStringFieldName=MM/dd/yyyy
+ */
+public class DayBasedPartitionStrategy implements PartitionStrategy {
+ private Logger LOG = LoggerFactory.getLogger(DayBasedPartitionStrategy.class);
+ private final String dateStringFieldName;
+ private final DateTimeFormatter dtfOut;
+
+ public DayBasedPartitionStrategy() {
+ this.dateStringFieldName = "datestr";
+ this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
+ }
+
+ @Override public FileStatus[] scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) {
+ return fsClient.getDirectoriesMatchingPattern(ref, "/*/*/*");
+ }
+
+ @Override public String[] getHivePartitionFieldNames() {
+ return new String[] {dateStringFieldName};
+ }
+
+ @Override
+ public String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition) {
+ //yyyy/mm/dd
+ String basePath = metadata.getBaseDatasetPath();
+ String partitionPath = partition.toUri().getPath();
+ if (!partitionPath.contains(basePath)) {
+ throw new IllegalArgumentException(
+ "Partition path " + partitionPath + " is not part of the dataset " + metadata);
+ }
+ // Get the partition part and remove the / as well at the end
+ String partitionPart = partitionPath.substring(basePath.length() + 1);
+ LOG.info("Extracting parts from " + partitionPart);
+ int year = extractPart(partitionPart, 0);
+ int mm = extractPart(partitionPart, 1);
+ int dd = extractPart(partitionPart, 2);
+ DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
+ return new String[] {dtfOut.print(dateTime)};
+ }
+
+ private int extractPart(String pathString, int index) {
+ String[] parts = pathString.split("/");
+ String part = parts[index];
+ return Integer.parseInt(part);
+ }
+
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java
new file mode 100644
index 000000000..424b64d52
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/ParseSchemaFromDataStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.impl;
+
+import com.uber.hoodie.hive.HoodieHiveDatasetException;
+import com.uber.hoodie.hive.SchemaStrategy;
+import com.uber.hoodie.hive.client.HoodieFSClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import org.apache.hadoop.fs.Path;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+
+/**
+ * Schema strategy to read the parquet schema from any of the data file
+ */
+public class ParseSchemaFromDataStrategy implements SchemaStrategy {
+ @Override
+ public MessageType getDatasetSchema(HoodieDatasetReference metadata, HoodieFSClient fsClient) {
+ Path anyDataFile = fsClient.lastDataFileForDataset(metadata, metadata.getBaseDatasetPath());
+ try {
+ return fsClient.readSchemaFromDataFile(anyDataFile);
+ } catch (IOException e) {
+ throw new HoodieHiveDatasetException(
+ "Could not read schema for " + metadata + ", tried to read schema from "
+ + anyDataFile, e);
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java
new file mode 100644
index 000000000..41598bc7d
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/HoodieDatasetReference.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.model;
+
+
+import java.util.Objects;
+
+/**
+ * A reference to a Dataset. Each dataset will have a hadoop configuration, table name,
+ * base path in HDFS. {@link HoodieDatasetReference} is immutable.
+ */
+public class HoodieDatasetReference {
+ private String tableName;
+ private String baseDatasetPath;
+ private String databaseName;
+
+ public HoodieDatasetReference(String tableName, String baseDatasetPath, String databaseName) {
+ this.tableName = tableName;
+ this.baseDatasetPath = baseDatasetPath;
+ this.databaseName = databaseName;
+ }
+
+ public String getDatabaseTableName() {
+ return databaseName + "." + tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getBaseDatasetPath() {
+ return baseDatasetPath;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ HoodieDatasetReference that = (HoodieDatasetReference) o;
+ return Objects.equals(tableName, that.tableName) &&
+ Objects.equals(baseDatasetPath, that.baseDatasetPath) &&
+ Objects.equals(databaseName, that.databaseName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, baseDatasetPath, databaseName);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("HoodieDatasetReference{");
+ sb.append("tableName='").append(tableName).append('\'');
+ sb.append(", baseDatasetPath='").append(baseDatasetPath).append('\'');
+ sb.append(", databaseName='").append(databaseName).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java
new file mode 100644
index 000000000..a6ad1a5c0
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/SchemaDifference.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.model;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the schema difference between the storage schema and hive table schema
+ */
+public class SchemaDifference {
+ private final MessageType storageSchema;
+ private final Map tableSchema;
+ private final List deleteColumns;
+ private final Map updateColumnTypes;
+ private final Map addColumnTypes;
+
+ private SchemaDifference(MessageType storageSchema, Map tableSchema,
+ List deleteColumns, Map updateColumnTypes, Map addColumnTypes) {
+ this.storageSchema = storageSchema;
+ this.tableSchema = tableSchema;
+ this.deleteColumns = ImmutableList.copyOf(deleteColumns);
+ this.updateColumnTypes = ImmutableMap.copyOf(updateColumnTypes);
+ this.addColumnTypes = ImmutableMap.copyOf(addColumnTypes);
+ }
+
+ public List getDeleteColumns() {
+ return deleteColumns;
+ }
+
+ public Map getUpdateColumnTypes() {
+ return updateColumnTypes;
+ }
+
+ public Map getAddColumnTypes() {
+ return addColumnTypes;
+ }
+
+ @Override public String toString() {
+ return Objects.toStringHelper(this).add("deleteColumns", deleteColumns)
+ .add("updateColumnTypes", updateColumnTypes).add("addColumnTypes", addColumnTypes)
+ .toString();
+ }
+
+ public static Builder newBuilder(MessageType storageSchema, Map tableSchema) {
+ return new Builder(storageSchema, tableSchema);
+ }
+
+ public boolean isEmpty() {
+ return deleteColumns.isEmpty() && updateColumnTypes.isEmpty() && addColumnTypes.isEmpty();
+ }
+
+ public static class Builder {
+ private final MessageType storageSchema;
+ private final Map tableSchema;
+ private List deleteColumns;
+ private Map updateColumnTypes;
+ private Map addColumnTypes;
+
+ public Builder(MessageType storageSchema, Map tableSchema) {
+ this.storageSchema = storageSchema;
+ this.tableSchema = tableSchema;
+ deleteColumns = Lists.newArrayList();
+ updateColumnTypes = Maps.newHashMap();
+ addColumnTypes = Maps.newHashMap();
+ }
+
+ public Builder deleteTableColumn(String column) {
+ deleteColumns.add(column);
+ return this;
+ }
+
+ public Builder updateTableColumn(String column, String storageColumnType) {
+ updateColumnTypes.put(column, storageColumnType);
+ return this;
+ }
+
+ public Builder addTableColumn(String name, String type) {
+ addColumnTypes.put(name, type);
+ return this;
+ }
+
+ public SchemaDifference build() {
+ return new SchemaDifference(storageSchema, tableSchema, deleteColumns, updateColumnTypes, addColumnTypes);
+ }
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java
new file mode 100644
index 000000000..4558b78c4
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.model;
+
+import com.google.common.base.Objects;
+import com.uber.hoodie.hive.PartitionStrategy;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StoragePartition {
+ private static Logger LOG = LoggerFactory.getLogger(StoragePartition.class);
+ private final PartitionStrategy partitionStrategy;
+ private final Path partitionPath;
+ private final HoodieDatasetReference metadata;
+
+ public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy,
+ FileStatus input) {
+ this.metadata = metadata;
+ this.partitionPath = Path.getPathWithoutSchemeAndAuthority(input.getPath());
+ this.partitionStrategy = partitionStrategy;
+ }
+
+ public String[] getPartitionFieldValues() {
+ return partitionStrategy.convertPartitionToValues(metadata, partitionPath);
+ }
+
+ public Path getPartitionPath() {
+ return partitionPath;
+ }
+
+ @Override public String toString() {
+ return Objects.toStringHelper(this).add("partitionPath", partitionPath)
+ .add("metadata", metadata).toString();
+ }
+}
diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java
new file mode 100644
index 000000000..480463d7f
--- /dev/null
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/TablePartition.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.model;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+
+public class TablePartition {
+ private final HoodieDatasetReference metadata;
+ private final Partition partition;
+
+ public TablePartition(HoodieDatasetReference metadata, Partition partition) {
+ this.metadata = metadata;
+ this.partition = partition;
+ }
+
+ public Path getLocation() {
+ return Path.getPathWithoutSchemeAndAuthority(new Path(partition.getSd().getLocation()));
+ }
+
+ public String[] getPartitionFieldValues() {
+ return partition.getValues().toArray(new String[partition.getValuesSize()]);
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java
new file mode 100644
index 000000000..e00e5f6b4
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/DatasetSchemaTest.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import com.uber.hoodie.hive.client.SchemaUtil;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.model.SchemaDifference;
+import com.uber.hoodie.hive.util.TestUtil;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.model.InitializationError;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class DatasetSchemaTest {
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ TestUtil.setUp();
+ }
+
+ @Test
+ public void testSchemaDiff() throws IOException, InitializationError {
+ HoodieDatasetReference metadata = TestUtil
+ .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema");
+ HoodieHiveSchemaSyncTask schema =
+ HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ SchemaDifference diff = schema.getSchemaDifference();
+ assertEquals("There should be 4 columns to be added", 4, diff.getAddColumnTypes().size());
+ assertEquals("No update columns expected", 0, diff.getUpdateColumnTypes().size());
+ assertEquals("No delete columns expected", 0, diff.getDeleteColumns().size());
+ schema.sync();
+
+ schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ diff = schema.getSchemaDifference();
+ assertEquals("After sync, there should not be any new columns to add", 0,
+ diff.getAddColumnTypes().size());
+ assertEquals("After sync, there should not be any new columns to update", 0,
+ diff.getUpdateColumnTypes().size());
+ assertEquals("After sync, there should not be any new columns to delete", 0,
+ diff.getDeleteColumns().size());
+ }
+
+ @Test
+ public void testSchemaEvolution() throws IOException, InitializationError {
+ int initialPartitionsCount = 5;
+ HoodieDatasetReference metadata = TestUtil
+ .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/",
+ initialPartitionsCount, "/nation.schema");
+ HoodieHiveSchemaSyncTask schema =
+ HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ schema.sync();
+
+ schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ SchemaDifference diff = schema.getSchemaDifference();
+ assertEquals("After sync, diff should be empty", true, diff.isEmpty());
+ int newSchemaversion = 2;
+ int newPartitionsCount = 2;
+ TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema",
+ DateTime.now().getMillis(), newSchemaversion);
+ schema = HoodieHiveSchemaSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ diff = schema.getSchemaDifference();
+ assertEquals("Schema has evolved, there should be a diff", false, diff.isEmpty());
+ assertEquals("Schema has evolved, there should be 1 column to add", 1,
+ diff.getAddColumnTypes().size());
+ assertEquals("Schema has evolved, there should be 1 column to update", 1,
+ diff.getUpdateColumnTypes().size());
+ assertEquals(0, diff.getDeleteColumns().size());
+ }
+
+ /**
+ * Testing converting array types to Hive field declaration strings,
+ * according to the Parquet-113 spec:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+ */
+ @Test
+ public void testSchemaConvertArray() throws IOException {
+ // Testing the 3-level annotation structure
+ MessageType schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().optional(PrimitiveType.PrimitiveTypeName.INT32).named("element")
+ .named("list").named("int_list").named("ArrayOfInts");
+
+ String schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`int_list` ARRAY< int>", schemaString);
+
+ // A array of arrays
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().requiredGroup().as(OriginalType.LIST).repeatedGroup()
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list")
+ .named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
+
+ // A list of integers
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeated(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("int_list")
+ .named("ArrayOfInts");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`int_list` ARRAY< int>", schemaString);
+
+ // A list of structs with two fields
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("num").named("element")
+ .named("tuple_list").named("ArrayOfTuples");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
+
+ // A list of structs with a single field
+ // For this case, since the inner group name is "array", we treat the
+ // element type as a one-element struct.
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
+ .named("array").named("one_tuple_list").named("ArrayOfOneTuples");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
+
+ // A list of structs with a single field
+ // For this case, since the inner group name ends with "_tuple", we also treat the
+ // element type as a one-element struct.
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
+ .named("one_tuple_list_tuple").named("one_tuple_list").named("ArrayOfOneTuples2");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
+
+ // A list of structs with a single field
+ // Unlike the above two cases, for this the element type is the type of the
+ // only field in the struct.
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
+ .named("one_tuple_list").named("one_tuple_list").named("ArrayOfOneTuples3");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
+
+ // A list of maps
+ schema =
+ parquet.schema.Types.buildMessage().optionalGroup().as(parquet.schema.OriginalType.LIST)
+ .repeatedGroup().as(OriginalType.MAP).repeatedGroup().as(OriginalType.MAP_KEY_VALUE)
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+ .named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("int_value").named("key_value").named("array").named("map_list")
+ .named("ArrayOfMaps");
+
+ schemaString = SchemaUtil.generateSchemaString(schema);
+ assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java
new file mode 100644
index 000000000..cdcb528d1
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HDroneDatasetTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive;
+
+import com.uber.hoodie.hive.client.HoodieHiveClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import com.uber.hoodie.hive.util.TestUtil;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.model.InitializationError;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HDroneDatasetTest {
+ private HoodieHiveClient hiveClient;
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ TestUtil.setUp();
+ hiveClient = new HoodieHiveClient(TestUtil.hDroneConfiguration);
+ }
+
+ @Test
+ public void testDatasetCreation() throws IOException, InitializationError {
+ HoodieDatasetReference metadata = TestUtil
+ .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/", 5, "/nation.schema");
+ HoodieHiveDatasetSyncTask dataset =
+ HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ assertEquals("There should be 5 new partitions", 5, dataset.getNewPartitions().size());
+ assertEquals("There should not be any changed partitions", 0,
+ dataset.getChangedPartitions().size());
+ assertFalse("Table should not exist", hiveClient.checkTableExists(metadata));
+ dataset.sync();
+
+ dataset = HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ assertTrue("Table should exist after sync", hiveClient.checkTableExists(metadata));
+ assertEquals("After sync, There should not be any new partitions to sync", 0,
+ dataset.getNewPartitions().size());
+ assertEquals("After sync, There should not be any modified partitions to sync", 0,
+ dataset.getChangedPartitions().size());
+
+ assertEquals("Table Schema should have 5 fields", 5,
+ hiveClient.getTableSchema(metadata).size());
+ }
+
+ @Test
+ public void testDatasetEvolution() throws IOException, InitializationError {
+ int initialPartitionsCount = 5;
+ HoodieDatasetReference metadata = TestUtil
+ .createDataset("test1", "/tmp/hdfs/DatasetSchemaTest/testSchema/",
+ initialPartitionsCount, "/nation.schema");
+ HoodieHiveDatasetSyncTask dataset =
+ HoodieHiveDatasetSyncTask.newBuilder().withReference(metadata)
+ .withConfiguration(TestUtil.hDroneConfiguration).build();
+ dataset.sync();
+
+ dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
+ int newSchemaversion = 2;
+ int newPartitionsCount = 2;
+ TestUtil.evolveDataset(metadata, newPartitionsCount, "/nation_evolved.schema",
+ DateTime.now().getMillis(), newSchemaversion);
+ dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
+ assertEquals("There should be " + newPartitionsCount + " partitions to be added",
+ newPartitionsCount, dataset.getNewPartitions().size());
+ dataset.sync();
+
+ dataset = HoodieHiveDatasetSyncTask.newBuilder(dataset).build();
+ MessageType newDatasetSchema = dataset.getSchemaSyncTask().getStorageSchema();
+ MessageType expectedSchema = TestUtil.readSchema("/nation_evolved.schema");
+ assertEquals("Table schema should be evolved schema", expectedSchema, newDatasetSchema);
+ assertEquals("Table schema should have 6 fields", 6,
+ hiveClient.getTableSchema(metadata).size());
+ assertEquals("", "BIGINT", hiveClient.getTableSchema(metadata).get("region_key"));
+ }
+
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java
new file mode 100644
index 000000000..321d836c0
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvParquetWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CsvParquetWriter extends ParquetWriter> {
+
+ public CsvParquetWriter(Path file, MessageType schema) throws IOException {
+ this(file, schema, false);
+ }
+
+ public CsvParquetWriter(Path file, MessageType schema, boolean enableDictionary)
+ throws IOException {
+ this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary);
+ }
+
+ public CsvParquetWriter(Path file, MessageType schema, CompressionCodecName codecName,
+ boolean enableDictionary) throws IOException {
+ super(file, new CsvWriteSupport(schema), codecName,
+ DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java
new file mode 100644
index 000000000..49982f0bb
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/CsvWriteSupport.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class CsvWriteSupport extends WriteSupport> {
+ MessageType schema;
+ RecordConsumer recordConsumer;
+ List cols;
+
+ // TODO: support specifying encodings and compression
+ public CsvWriteSupport(MessageType schema) {
+ this.schema = schema;
+ this.cols = schema.getColumns();
+ }
+
+ @Override public WriteContext init(Configuration config) {
+ return new WriteContext(schema, new HashMap());
+ }
+
+ @Override public void prepareForWrite(RecordConsumer r) {
+ recordConsumer = r;
+ }
+
+ @Override public void write(List values) {
+ if (values.size() != cols.size()) {
+ throw new ParquetEncodingException("Invalid input data. Expecting " +
+ cols.size() + " columns. Input had " + values.size() + " columns (" + cols + ") : "
+ + values);
+ }
+
+ recordConsumer.startMessage();
+ for (int i = 0; i < cols.size(); ++i) {
+ String val = values.get(i);
+ // val.length() == 0 indicates a NULL value.
+ if (val.length() > 0) {
+ recordConsumer.startField(cols.get(i).getPath()[0], i);
+ switch (cols.get(i).getType()) {
+ case BOOLEAN:
+ recordConsumer.addBoolean(Boolean.parseBoolean(val));
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(Float.parseFloat(val));
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(Double.parseDouble(val));
+ break;
+ case INT32:
+ recordConsumer.addInteger(Integer.parseInt(val));
+ break;
+ case INT64:
+ recordConsumer.addLong(Long.parseLong(val));
+ break;
+ case BINARY:
+ recordConsumer.addBinary(stringToBinary(val));
+ break;
+ default:
+ throw new ParquetEncodingException(
+ "Unsupported column type: " + cols.get(i).getType());
+ }
+ recordConsumer.endField(cols.get(i).getPath()[0], i);
+ }
+ }
+ recordConsumer.endMessage();
+ }
+
+ private Binary stringToBinary(Object value) {
+ return Binary.fromString(value.toString());
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java
new file mode 100644
index 000000000..a96d682ae
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HdfsTestService.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * An HDFS minicluster service implementation.
+ */
+public class HdfsTestService {
+
+ private static final Logger logger = LoggerFactory.getLogger(HdfsTestService.class);
+
+ /**
+ * Configuration settings
+ */
+ private Configuration hadoopConf;
+ private String workDir;
+ private String bindIP = "127.0.0.1";
+ private int namenodeRpcPort = 8020;
+ private int namenodeHttpPort = 50070;
+ private int datanodePort = 50010;
+ private int datanodeIpcPort = 50020;
+ private int datanodeHttpPort = 50075;
+
+ /**
+ * Embedded HDFS cluster
+ */
+ private MiniDFSCluster miniDfsCluster;
+
+ public HdfsTestService() {
+ hadoopConf = new Configuration();
+ workDir = Files.createTempDir().getAbsolutePath();
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public MiniDFSCluster start(boolean format) throws IOException {
+ Preconditions
+ .checkState(workDir != null, "The work dir must be set before starting cluster.");
+
+ if (hadoopConf == null) {
+ hadoopConf = new Configuration();
+ }
+
+ // If clean, then remove the work dir so we can start fresh.
+ String localDFSLocation = getDFSLocation(workDir);
+ if (format) {
+ logger.info(
+ "Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
+ File file = new File(localDFSLocation);
+ FileUtils.deleteDirectory(file);
+ }
+
+ // Configure and start the HDFS cluster
+ // boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
+ hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
+ namenodeHttpPort, datanodePort, datanodeIpcPort, datanodeHttpPort);
+ miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format)
+ .checkDataNodeAddrConfig(true).checkDataNodeHostConfig(true).build();
+ logger.info("HDFS Minicluster service started.");
+ return miniDfsCluster;
+ }
+
+ public void stop() throws IOException {
+ miniDfsCluster.shutdown();
+ logger.info("HDFS Minicluster service shut down.");
+ miniDfsCluster = null;
+ hadoopConf = null;
+ }
+
+ /**
+ * Get the location on the local FS where we store the HDFS data.
+ *
+ * @param baseFsLocation The base location on the local filesystem we have write access to
+ * create dirs.
+ * @return The location for HDFS data.
+ */
+ private static String getDFSLocation(String baseFsLocation) {
+ return baseFsLocation + Path.SEPARATOR + "dfs";
+ }
+
+ /**
+ * Returns true if we should format the DFS Cluster. We'll format if clean is
+ * true, or if the dfsFsLocation does not exist.
+ *
+ * @param localDFSLocation The location on the local FS to hold the HDFS metadata and block
+ * data
+ * @param clean Specifies if we want to start a clean cluster
+ * @return Returns true if we should format a DFSCluster, otherwise false
+ */
+ private static boolean shouldFormatDFSCluster(String localDFSLocation, boolean clean) {
+ boolean format = true;
+ File f = new File(localDFSLocation);
+ if (f.exists() && f.isDirectory() && !clean) {
+ format = false;
+ }
+ return format;
+ }
+
+ /**
+ * Configure the DFS Cluster before launching it.
+ *
+ * @param config The already created Hadoop configuration we'll further configure
+ * for HDFS
+ * @param localDFSLocation The location on the local filesystem where cluster data is stored
+ * @param bindIP An IP address we want to force the datanode and namenode to bind
+ * to.
+ * @param namenodeRpcPort
+ * @param namenodeHttpPort
+ * @param datanodePort
+ * @param datanodeIpcPort
+ * @param datanodeHttpPort
+ * @return The updated Configuration object.
+ */
+ private static Configuration configureDFSCluster(Configuration config, String localDFSLocation,
+ String bindIP, int namenodeRpcPort, int namenodeHttpPort, int datanodePort,
+ int datanodeIpcPort, int datanodeHttpPort) {
+
+ logger.info("HDFS force binding to ip: " + bindIP);
+ config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
+ config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
+ config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
+ config.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, bindIP + ":" + datanodeHttpPort);
+ // When a datanode registers with the namenode, the Namenode do a hostname
+ // check of the datanode which will fail on OpenShift due to reverse DNS
+ // issues with the internal IP addresses. This config disables that check,
+ // and will allow a datanode to connect regardless.
+ config.setBoolean("dfs.namenode.datanode.registration.ip-hostname-check", false);
+ config.set("hdfs.minidfs.basedir", localDFSLocation);
+ // allow current user to impersonate others
+ String user = System.getProperty("user.name");
+ config.set("hadoop.proxyuser." + user + ".groups", "*");
+ config.set("hadoop.proxyuser." + user + ".hosts", "*");
+ return config;
+ }
+
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
new file mode 100644
index 000000000..e9faa4536
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class HiveTestService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
+
+ private static final int CONNECTION_TIMEOUT = 30000;
+
+ /**
+ * Configuration settings
+ */
+ private Configuration hadoopConf;
+ private String workDir;
+ private String bindIP = "127.0.0.1";
+ private int metastorePort = 9083;
+ private int serverPort = 9999;
+ private boolean clean = true;
+
+ private Map sysProps = Maps.newHashMap();
+ private ExecutorService executorService;
+ private TServer tServer;
+ private HiveServer2 hiveServer;
+
+ public HiveTestService(Configuration configuration) {
+ this.workDir = Files.createTempDir().getAbsolutePath();
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public HiveServer2 start() throws IOException {
+ Preconditions
+ .checkState(workDir != null, "The work dir must be set before starting cluster.");
+
+ if (hadoopConf == null) {
+ hadoopConf = new Configuration();
+ }
+
+ String localHiveLocation = getHiveLocation(workDir);
+ if (clean) {
+ LOG.info(
+ "Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
+ File file = new File(localHiveLocation);
+ FileUtils.deleteDirectory(file);
+ }
+
+ HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
+
+ executorService = Executors.newSingleThreadExecutor();
+ tServer = startMetaStore(bindIP, metastorePort, serverConf);
+
+ hiveServer = startHiveServer(serverConf);
+
+ String serverHostname;
+ if (bindIP.equals("0.0.0.0")) {
+ serverHostname = "localhost";
+ } else {
+ serverHostname = bindIP;
+ }
+ if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ LOG.info("Hive Minicluster service started.");
+ return hiveServer;
+ }
+
+ public void stop() throws IOException {
+ resetSystemProperties();
+ if (tServer != null) {
+ tServer.stop();
+ }
+ if (hiveServer != null) {
+ hiveServer.stop();
+ }
+ LOG.info("Hive Minicluster service shut down.");
+ tServer = null;
+ hiveServer = null;
+ hadoopConf = null;
+ }
+
+ private HiveConf configureHive(Configuration conf, String localHiveLocation)
+ throws IOException {
+ conf.set("hive.metastore.local", "false");
+ conf.set(HiveConf.ConfVars.METASTOREURIS.varname,
+ "thrift://" + bindIP + ":" + metastorePort);
+ conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
+ conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
+ // The following line to turn of SASL has no effect since HiveAuthFactory calls
+ // 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
+ // in Hive 0.14.
+ // As a workaround, the property is set in hive-site.xml in this module.
+ //conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL");
+ File localHiveDir = new File(localHiveLocation);
+ localHiveDir.mkdirs();
+ File metastoreDbDir = new File(localHiveDir, "metastore_db");
+ conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
+ "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
+ File derbyLogFile = new File(localHiveDir, "derby.log");
+ derbyLogFile.createNewFile();
+ setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
+ conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+ Files.createTempDir().getAbsolutePath());
+
+ return new HiveConf(conf, this.getClass());
+ }
+
+ private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ new HiveMetaStoreClient(serverConf);
+ return true;
+ } catch (MetaException e) {
+ // ignore as this is expected
+ LOG.info("server " + hostname + ":" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ private void setSystemProperty(String name, String value) {
+ if (!sysProps.containsKey(name)) {
+ String currentValue = System.getProperty(name);
+ sysProps.put(name, currentValue);
+ }
+ if (value != null) {
+ System.setProperty(name, value);
+ } else {
+ System.getProperties().remove(name);
+ }
+ }
+
+ private void resetSystemProperties() {
+ for (Map.Entry entry : sysProps.entrySet()) {
+ if (entry.getValue() != null) {
+ System.setProperty(entry.getKey(), entry.getValue());
+ } else {
+ System.getProperties().remove(entry.getKey());
+ }
+ }
+ sysProps.clear();
+ }
+
+ private static String getHiveLocation(String baseLocation) {
+ return baseLocation + Path.SEPARATOR + "hive";
+ }
+
+ private HiveServer2 startHiveServer(HiveConf serverConf) {
+ HiveServer2 hiveServer = new HiveServer2();
+ hiveServer.init(serverConf);
+ hiveServer.start();
+ return hiveServer;
+ }
+
+ // XXX: From org.apache.hadoop.hive.metastore.HiveMetaStore,
+ // with changes to support binding to a specified IP address (not only 0.0.0.0)
+
+
+ private static final class ChainedTTransportFactory extends TTransportFactory {
+ private final TTransportFactory parentTransFactory;
+ private final TTransportFactory childTransFactory;
+
+ private ChainedTTransportFactory(TTransportFactory parentTransFactory,
+ TTransportFactory childTransFactory) {
+ this.parentTransFactory = parentTransFactory;
+ this.childTransFactory = childTransFactory;
+ }
+
+ @Override public TTransport getTransport(TTransport trans) {
+ return childTransFactory.getTransport(parentTransFactory.getTransport(trans));
+ }
+ }
+
+
+ private static final class TServerSocketKeepAlive extends TServerSocket {
+ public TServerSocketKeepAlive(int port) throws TTransportException {
+ super(port, 0);
+ }
+
+ public TServerSocketKeepAlive(InetSocketAddress address) throws TTransportException {
+ super(address, 0);
+ }
+
+ @Override protected TSocket acceptImpl() throws TTransportException {
+ TSocket ts = super.acceptImpl();
+ try {
+ ts.getSocket().setKeepAlive(true);
+ } catch (SocketException e) {
+ throw new TTransportException(e);
+ }
+ return ts;
+ }
+ }
+
+ public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException {
+ try {
+ // Server will create new threads up to max as necessary. After an idle
+ // period, it will destory threads to keep the number of threads in the
+ // pool to min.
+ int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
+ int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
+ boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
+ boolean useFramedTransport =
+ conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
+
+ // don't support SASL yet
+ //boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
+
+ TServerTransport serverTransport;
+ if (forceBindIP != null) {
+ InetSocketAddress address = new InetSocketAddress(forceBindIP, port);
+ serverTransport =
+ tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
+
+ } else {
+ serverTransport =
+ tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port);
+ }
+
+ TProcessor processor;
+ TTransportFactory transFactory;
+
+ IHMSHandler handler = (IHMSHandler) HiveMetaStore
+ .newRetryingHMSHandler("new db based metaserver", conf, true);
+
+ if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
+ transFactory = useFramedTransport ?
+ new ChainedTTransportFactory(new TFramedTransport.Factory(),
+ new TUGIContainingTransport.Factory()) :
+ new TUGIContainingTransport.Factory();
+
+ processor = new TUGIBasedProcessor(handler);
+ LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
+ } else {
+ transFactory =
+ useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory();
+ processor = new TSetIpAddressProcessor(handler);
+ LOG.info("Starting DB backed MetaStore Server");
+ }
+
+ TThreadPoolServer.Args args =
+ new TThreadPoolServer.Args(serverTransport).processor(processor)
+ .transportFactory(transFactory).protocolFactory(new TBinaryProtocol.Factory())
+ .minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads);
+
+ final TServer tServer = new TThreadPoolServer(args);
+ executorService.submit(new Runnable() {
+ @Override public void run() {
+ tServer.serve();
+ }
+ });
+ return tServer;
+ } catch (Throwable x) {
+ throw new IOException(x);
+ }
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java
new file mode 100644
index 000000000..f07bc0c3f
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/TestUtil.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+import com.google.common.collect.Sets;
+import com.uber.hoodie.hive.HoodieHiveConfiguration;
+import com.uber.hoodie.hive.client.HoodieHiveClient;
+import com.uber.hoodie.hive.model.HoodieDatasetReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.runners.model.InitializationError;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class TestUtil {
+ private static MiniDFSCluster dfsCluster;
+ private static ZooKeeperServer zkServer;
+ private static HiveServer2 hiveServer;
+ public static Configuration configuration;
+ public static HoodieHiveConfiguration hDroneConfiguration;
+ private static DateTimeFormatter dtfOut;
+ public static final String CSV_DELIMITER = "|";
+ private static FileSystem fileSystem;
+ private static Set createdTablesSet = Sets.newHashSet();
+
+ public static void setUp() throws IOException, InterruptedException {
+ if (dfsCluster == null) {
+ HdfsTestService service = new HdfsTestService();
+ dfsCluster = service.start(true);
+ configuration = service.getHadoopConf();
+ }
+ if (zkServer == null) {
+ ZookeeperTestService zkService = new ZookeeperTestService(configuration);
+ zkServer = zkService.start();
+ }
+ if (hiveServer == null) {
+ HiveTestService hiveService = new HiveTestService(configuration);
+ hiveServer = hiveService.start();
+ }
+ hDroneConfiguration =
+ HoodieHiveConfiguration.newBuilder().hiveJdbcUrl("jdbc:hive2://127.0.0.1:9999/")
+ .hivedb("hdrone_test").jdbcUsername("").jdbcPassword("")
+ .hadoopConfiguration(hiveServer.getHiveConf()).build();
+ dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
+
+ HoodieHiveClient client = new HoodieHiveClient(hDroneConfiguration);
+ for (String tableName : createdTablesSet) {
+ client.updateHiveSQL("drop table if exists " + tableName);
+ }
+ createdTablesSet.clear();
+ client.updateHiveSQL(
+ "drop database if exists " + hDroneConfiguration.getDbName());
+ client.updateHiveSQL("create database " + hDroneConfiguration.getDbName());
+
+ fileSystem = FileSystem.get(configuration);
+ }
+
+ public static void shutdown() {
+ if (hiveServer != null) {
+ hiveServer.stop();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ if (zkServer != null) {
+ zkServer.shutdown();
+ }
+ }
+
+ public static HoodieDatasetReference createDataset(String tableName, String hdfsPath, int numberOfPartitions,
+ String schemaFile) throws IOException, InitializationError {
+ Path path = new Path(hdfsPath);
+ FileUtils.deleteDirectory(new File(hdfsPath));
+
+ boolean result = fileSystem.mkdirs(path);
+ checkResult(result);
+ HoodieDatasetReference metadata =
+ new HoodieDatasetReference(tableName, path.toString(),
+ hDroneConfiguration.getDbName());
+ DateTime dateTime = DateTime.now();
+ createPartitions(metadata, numberOfPartitions, schemaFile, dateTime, 1);
+ createdTablesSet.add(metadata.getDatabaseTableName());
+ return metadata;
+ }
+
+ private static void createPartitions(HoodieDatasetReference metadata, int numberOfPartitions,
+ String schemaFile, DateTime startFrom, int schemaVersion) throws IOException {
+ startFrom = startFrom.withTimeAtStartOfDay();
+
+ for (int i = 0; i < numberOfPartitions; i++) {
+ Path partPath = new Path(metadata.getBaseDatasetPath() + "/" + dtfOut.print(startFrom));
+ fileSystem.makeQualified(partPath);
+ fileSystem.mkdirs(partPath);
+ createTestData(partPath, schemaFile, schemaVersion);
+ startFrom = startFrom.minusDays(1);
+ }
+ }
+
+ private static void createTestData(Path partPath, String schemaFile, int schemaVersion)
+ throws IOException {
+ for (int i = 0; i < 5; i++) {
+ // Create 5 files
+ Path filePath =
+ new Path(partPath.toString() + "/" + getParquetFilePath(schemaVersion, i));
+ generateParquetData(filePath, schemaFile);
+ }
+ }
+
+ private static String getParquetFilePath(int version, int iteration) {
+ return "test.topic.name@sjc1@SV_" + version + "@" + iteration + ".parquet";
+ }
+
+ public static MessageType readSchema(String schemaFile) throws IOException {
+ return MessageTypeParser
+ .parseMessageType(IOUtils.toString(TestUtil.class.getResourceAsStream(schemaFile)));
+ }
+
+ public static void generateParquetData(Path filePath, String schemaFile) throws IOException {
+ MessageType schema = readSchema(schemaFile);
+ CsvParquetWriter writer = new CsvParquetWriter(filePath, schema);
+
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(TestUtil.class.getResourceAsStream(getDataFile(schemaFile))));
+ String line;
+ try {
+ while ((line = br.readLine()) != null) {
+ String[] fields = line.split(Pattern.quote(CSV_DELIMITER));
+ writer.write(Arrays.asList(fields));
+ }
+ writer.close();
+ } finally {
+ br.close();
+ }
+
+ InputStreamReader io = null;
+ FSDataOutputStream hdfsPath = null;
+ try {
+ io = new FileReader(filePath.toString());
+ hdfsPath = fileSystem.create(filePath);
+ IOUtils.copy(io, hdfsPath);
+ } finally {
+ if (io != null) {
+ io.close();
+ }
+ if (hdfsPath != null) {
+ hdfsPath.close();
+ }
+ }
+ }
+
+ private static String getDataFile(String schemaFile) {
+ return schemaFile.replaceAll(".schema", ".csv");
+ }
+
+ private static void checkResult(boolean result) throws InitializationError {
+ if (!result) {
+ throw new InitializationError("Could not initialize");
+ }
+ }
+
+ public static void evolveDataset(HoodieDatasetReference metadata, int newPartitionCount,
+ String newSchema, Long startFrom, int schemaVersion) throws IOException {
+ createPartitions(metadata, newPartitionCount, newSchema,
+ new DateTime(startFrom).plusDays(newPartitionCount + 1), schemaVersion);
+ }
+}
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java
new file mode 100644
index 000000000..94a2cb425
--- /dev/null
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/ZookeeperTestService.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.hive.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * A Zookeeper minicluster service implementation.
+ *
+ * This class was ripped from MiniZooKeeperCluster from the HBase tests. Changes
+ * made include:
+ *
+ * 1. It will now only launch 1 zookeeper server.
+ *
+ * 2. It will only attempt to bind to the port specified, and will fail if it
+ * can't.
+ *
+ * 3. The startup method now takes a bindAddress, which allows us to configure
+ * which IP the ZK server binds to. This was not configurable in the original
+ * class.
+ *
+ * 4. The ZK cluster will re-use a data dir on the local filesystem if it
+ * already exists instead of blowing it away.
+ */
+public class ZookeeperTestService {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZookeeperTestService.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+
+ /**
+ * Configuration settings
+ */
+ private Configuration hadoopConf;
+ private String workDir;
+ private Integer clientPort = 2828;
+ private String bindIP = "127.0.0.1";
+ private Boolean clean = false;
+ private int tickTime = 0;
+
+ /**
+ * Embedded ZooKeeper cluster
+ */
+ private NIOServerCnxnFactory standaloneServerFactory;
+ private ZooKeeperServer zooKeeperServer;
+ private boolean started = false;
+
+ public ZookeeperTestService(Configuration config) {
+ this.workDir = Files.createTempDir().getAbsolutePath();
+ this.hadoopConf = config;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public ZooKeeperServer start() throws IOException, InterruptedException {
+ Preconditions.checkState(workDir != null,
+ "The localBaseFsLocation must be set before starting cluster.");
+
+ setupTestEnv();
+ stop();
+
+ File dir = new File(workDir, "zookeeper").getAbsoluteFile();
+ recreateDir(dir, clean);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ this.zooKeeperServer = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ standaloneServerFactory = new NIOServerCnxnFactory();
+
+ // NOTE: Changed from the original, where InetSocketAddress was
+ // originally created to bind to the wildcard IP, we now configure it.
+ logger.info("Zookeeper force binding to: " + this.bindIP);
+ standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(zooKeeperServer);
+
+ String serverHostname;
+ if (bindIP.equals("0.0.0.0")) {
+ serverHostname = "localhost";
+ } else {
+ serverHostname = bindIP;
+ }
+ if (!waitForServerUp(serverHostname, clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ started = true;
+ logger.info("Zookeeper Minicluster service started on client port: " + clientPort);
+ return zooKeeperServer;
+ }
+
+ public void stop() throws IOException {
+ if (!started) {
+ return;
+ }
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ // clear everything
+ started = false;
+ standaloneServerFactory = null;
+ zooKeeperServer = null;
+
+ logger.info("Zookeeper Minicluster service shut down.");
+ }
+
+ private void recreateDir(File dir, boolean clean) throws IOException {
+ if (dir.exists() && clean) {
+ FileUtil.fullyDelete(dir);
+ } else if (dir.exists() && !clean) {
+ // the directory's exist, and we don't want to clean, so exit
+ return;
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ sock.close();
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(String hostname, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket(hostname, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ logger.info("server " + hostname + ":" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+}
diff --git a/hoodie-hive/src/test/resources/nation.csv b/hoodie-hive/src/test/resources/nation.csv
new file mode 100644
index 000000000..ee71b02ea
--- /dev/null
+++ b/hoodie-hive/src/test/resources/nation.csv
@@ -0,0 +1,25 @@
+0|ALGERIA|0| haggle. carefully final deposits detect slyly agai
+1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
+2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
+3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
+4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
+5|ETHIOPIA|0|ven packages wake quickly. regu
+6|FRANCE|3|refully final requests. regular, ironi
+7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco
+8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun
+9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull
+10|IRAN|4|efully alongside of the slyly final dependencies.
+11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula
+12|JAPAN|2|ously. final, express gifts cajole a
+13|JORDAN|4|ic deposits are blithely about the carefully regular pa
+14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t
+15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?
+16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r
+17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
+18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos
+19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
+20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely
+21|VIETNAM|2|hely enticingly express accounts. even, final
+22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint
+23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull
+24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be
diff --git a/hoodie-hive/src/test/resources/nation.schema b/hoodie-hive/src/test/resources/nation.schema
new file mode 100644
index 000000000..8b57cb478
--- /dev/null
+++ b/hoodie-hive/src/test/resources/nation.schema
@@ -0,0 +1,6 @@
+message m {
+ required int32 nation_key;
+ required binary name;
+ required int32 region_key;
+ required binary comment_col;
+}
diff --git a/hoodie-hive/src/test/resources/nation_evolved.csv b/hoodie-hive/src/test/resources/nation_evolved.csv
new file mode 100644
index 000000000..6dc8ce4dc
--- /dev/null
+++ b/hoodie-hive/src/test/resources/nation_evolved.csv
@@ -0,0 +1,25 @@
+0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|desc0
+1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon|desc1
+2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special |desc2
+3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold|desc3
+4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d|desc4
+5|ETHIOPIA|0|ven packages wake quickly. regu|desc5
+6|FRANCE|3|refully final requests. regular, ironi|desc6
+7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco|desc7
+8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun|desc8
+9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull|desc9
+10|IRAN|4|efully alongside of the slyly final dependencies. |desc10
+11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula|desc11
+12|JAPAN|2|ously. final, express gifts cajole a|desc12
+13|JORDAN|4|ic deposits are blithely about the carefully regular pa|desc13
+14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t|desc14
+15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?|desc15
+16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r|desc16
+17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun|desc17
+18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos|desc18
+19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account|desc19
+20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely|desc20
+21|VIETNAM|2|hely enticingly express accounts. even, final |desc21
+22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint|desc22
+23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull|desc23
+24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be|desc24
diff --git a/hoodie-hive/src/test/resources/nation_evolved.schema b/hoodie-hive/src/test/resources/nation_evolved.schema
new file mode 100644
index 000000000..b395c27ac
--- /dev/null
+++ b/hoodie-hive/src/test/resources/nation_evolved.schema
@@ -0,0 +1,7 @@
+message m {
+ required int32 nation_key;
+ required binary name;
+ required int64 region_key;
+ required binary comment_col;
+ optional binary desc;
+}
diff --git a/pom.xml b/pom.xml
index 4aabe6def..8553fb378 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,6 +27,7 @@
hoodie-client
hoodie-cli
hoodie-hadoop-mr
+ hoodie-hive
@@ -173,6 +174,9 @@
**/.*
**/*.txt
**/*.sh
+ **/test/resources/*.avro
+ **/test/resources/*.schema
+ **/test/resources/*.csv
@@ -360,6 +364,12 @@
slf4j-api
1.7.5
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.5
+
+
org.apache.commons
commons-configuration2
@@ -380,9 +390,43 @@
org.apache.hive
hive-jdbc
- 1.1.0-cdh5.7.2
+ ${hive.version}-cdh${cdh.version}
+
+ org.apache.hive
+ hive-service
+ ${hive.version}-cdh${cdh.version}
+
+
+ org.apache.hive
+ hive-metastore
+ ${hive.version}-cdh${cdh.version}
+
+
+
+ junit
+ junit
+ 4.12
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ tests
+ ${hadoop.version}-cdh${cdh.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ tests
+ ${hadoop.version}-cdh${cdh.version}
+
+
+ org.mockito
+ mockito-all
+ test
+ 1.10.19
+