diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml index 5ddd3f549..d7a1eb5ba 100644 --- a/hoodie-hive/pom.xml +++ b/hoodie-hive/pom.xml @@ -24,12 +24,17 @@ 4.0.0 hoodie-hive + jar org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-client + org.apache.hadoop hadoop-hdfs @@ -63,6 +68,7 @@ libthrift 0.9.2 + commons-dbcp @@ -79,6 +85,11 @@ slf4j-log4j12 + + com.beust + jcommander + + junit @@ -136,7 +147,54 @@ org.apache.rat apache-rat-plugin + + org.apache.maven.plugins + maven-assembly-plugin + 2.4.1 + + + src/assembly/src.xml + + + + com.uber.hoodie.hive.example.HoodieHiveSyncExample + + + + + + + make-assembly + + package + + single + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.4 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/jars + false + false + true + + + + + diff --git a/hoodie-hive/src/assembly/src.xml b/hoodie-hive/src/assembly/src.xml new file mode 100644 index 000000000..adb5044b1 --- /dev/null +++ b/hoodie-hive/src/assembly/src.xml @@ -0,0 +1,44 @@ + + + + jar-with-dependencies + + jar + + + false + + + + / + true + runtime + + junit:junit + com.google.code.findbugs:* + org.apache.hbase:* + + + + + true + provided + + + diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java new file mode 100644 index 000000000..9a69033db --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hive; + +import com.beust.jcommander.Parameter; + +import java.io.Serializable; + +/** + * Configs needed to sync data into Hive. + */ +public class HiveSyncConfig implements Serializable { + + @Parameter(names = {"--database"}, description = "name of the target database in Hive", required = true) + public String databaseName; + + @Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true) + public String tableName; + + @Parameter(names = {"--user"}, description = "Hive username", required = true) + public String hiveUser; + + @Parameter(names = {"--pass"}, description = "Hive password", required = true) + public String hivePass; + + @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url", required = true) + public String jdbcUrl; + + @Parameter(names = {"--base-path"}, description = "Basepath of hoodie dataset to sync", required = true) + public String basePath; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java new file mode 100644 index 000000000..98819f538 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hive; + +import com.beust.jcommander.JCommander; +import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy; +import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy; +import com.uber.hoodie.hive.model.HoodieDatasetReference; + +import org.apache.hadoop.conf.Configuration; + +/** + * Tool to sync new data from commits, into Hive in terms of + * + * - New table/partitions + * - Updated schema for table/partitions + */ +public class HiveSyncTool { + + + /** + * Sync to Hive, based on day based partitioning + * + * @param cfg + */ + public static void sync(HiveSyncConfig cfg) { + // Configure to point to which metastore and database to connect to + HoodieHiveConfiguration apiConfig = + HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration()) + .hivedb(cfg.databaseName) + .hiveJdbcUrl(cfg.jdbcUrl) + .jdbcUsername(cfg.hiveUser) + .jdbcPassword(cfg.hivePass) + .build(); + + HoodieDatasetReference datasetReference = + new HoodieDatasetReference(cfg.tableName, cfg.basePath, cfg.databaseName); + + // initialize the strategies + PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy(); + SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy(); + + // Creates a new dataset which reflects the state at the time of creation + HoodieHiveDatasetSyncTask datasetSyncTask = + HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference) + .withConfiguration(apiConfig).partitionStrategy(partitionStrategy) + .schemaStrategy(schemaStrategy).build(); + + // Sync dataset + datasetSyncTask.sync(); + } + + + public static void main(String[] args) throws Exception { + + // parse the params + final HiveSyncConfig cfg = new HiveSyncConfig(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + sync(cfg); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java index 184917a1c..a07695acb 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveDatasetSyncTask.java @@ -25,6 +25,7 @@ import com.uber.hoodie.hive.model.StoragePartition; import com.uber.hoodie.hive.model.TablePartition; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,11 +152,11 @@ public class HoodieHiveDatasetSyncTask { .withFSClient(fsClient).build(); List storagePartitions = Lists.newArrayList(); - FileStatus[] storagePartitionPaths = schemaSyncTask.getPartitionStrategy() + List storagePartitionPaths = schemaSyncTask.getPartitionStrategy() .scanAllPartitions(schemaSyncTask.getReference(), schemaSyncTask.getFsClient()); - for (FileStatus fileStatus : storagePartitionPaths) { + for (String path : storagePartitionPaths) { storagePartitions.add(new StoragePartition(schemaSyncTask.getReference(), - schemaSyncTask.getPartitionStrategy(), fileStatus)); + schemaSyncTask.getPartitionStrategy(), path)); } LOG.info("Storage partitions scan complete. Found " + storagePartitions.size()); diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java index e48023b34..793703ffd 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/PartitionStrategy.java @@ -21,6 +21,8 @@ import com.uber.hoodie.hive.model.HoodieDatasetReference; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import java.util.List; + /** * Abstraction to define HDFS partition strategies. * Strategy provides hookups to map partitions on to physical layout @@ -29,13 +31,14 @@ import org.apache.hadoop.fs.Path; */ public interface PartitionStrategy { /** - * Scans the file system for all partitions and returns FileStatus[] which are the available partitions + * Scans the file system for all partitions and returns String[] which are the available partitions, relative to + * the base path * * @param basePath * @param fsClient * @return */ - FileStatus[] scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient); + List scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient); /** * Get the list of hive field names the dataset will be partitioned on. @@ -47,10 +50,10 @@ public interface PartitionStrategy { /** * Convert a Partition path (returned in scanAllPartitions) to values for column names returned in getHivePartitionFieldNames - * e.g. /data/topic/2016/12/12/ will return [2016, 12, 12] + * e.g. 2016/12/12/ will return [2016, 12, 12] * - * @param partition storage path + * @param partitionPath storage path * @return List of partitions field values */ - String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition); + String[] convertPartitionToValues(HoodieDatasetReference metadata, String partitionPath); } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java index 9fd95c004..2bdd36d73 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/client/HoodieFSClient.java @@ -50,6 +50,7 @@ public class HoodieFSClient { final public static String PARQUET_EXTENSION_ZIPPED = ".parquet.gz"; private final static Logger LOG = LoggerFactory.getLogger(HoodieFSClient.class); private final HoodieHiveConfiguration conf; + private final FileSystem fs; public HoodieFSClient(HoodieHiveConfiguration configuration) { @@ -123,32 +124,6 @@ public class HoodieFSClient { } } - /** - * Finds all the files/directories that match the pattern under the {@link HoodieDatasetReference} basePath - * - * @param metadata - * @param pattern - * @return - */ - public FileStatus[] getDirectoriesMatchingPattern(HoodieDatasetReference metadata, String pattern) { - try { - Path path = new Path(metadata.getBaseDatasetPath() + pattern); - FileStatus[] status = fs.globStatus(path); - List returns = Lists.newArrayList(); - for(FileStatus st:status) { - if(!st.getPath().toString().contains(".distcp")) { - // Ignore temporary directories created by distcp - returns.add(st); - } - } - return returns.toArray(new FileStatus[returns.size()]); - } catch (IOException e) { - throw new HoodieHiveDatasetException( - "IOException when reading directories under dataset " + metadata + " with pattern " - + pattern, e); - } - } - /** * Get the list of storage partitions which does not have its equivalent hive partitions * @@ -205,4 +180,7 @@ public class HoodieFSClient { return Objects.hashCode(paths); } + public FileSystem getFs() { + return fs; + } } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java deleted file mode 100644 index 1b23e870c..000000000 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieDatasetExample.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.hive.example; - -import com.uber.hoodie.hive.HoodieHiveConfiguration; -import com.uber.hoodie.hive.HoodieHiveDatasetSyncTask; -import com.uber.hoodie.hive.PartitionStrategy; -import com.uber.hoodie.hive.SchemaStrategy; -import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy; -import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy; -import com.uber.hoodie.hive.model.HoodieDatasetReference; -import org.apache.hadoop.conf.Configuration; - -/** - * Example showing basic usage of Hoodie Hive API - */ -public class HoodieDatasetExample { - public static void main(String[] args) { - // Configure to point to which metastore and database to connect to - HoodieHiveConfiguration apiConfig = - HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration()) - .hivedb("tmp").hiveJdbcUrl("jdbc:hive2://localhost:10010/").jdbcUsername("hive") - .jdbcPassword("hive").build(); - - HoodieDatasetReference datasetReference = - new HoodieDatasetReference("clickstream", "hdfs:///data/tables/user.clickstream", - "raw"); - - // initialize the strategies - PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy(); - SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy(); - - // Creates a new dataset which reflects the state at the time of creation - HoodieHiveDatasetSyncTask datasetSyncTask = - HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference) - .withConfiguration(apiConfig).partitionStrategy(partitionStrategy) - .schemaStrategy(schemaStrategy).build(); - - // Sync dataset - datasetSyncTask.sync(); - } -} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieHiveSyncExample.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieHiveSyncExample.java new file mode 100644 index 000000000..9f83ba586 --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/example/HoodieHiveSyncExample.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.example; + +import com.uber.hoodie.hive.HiveSyncTool; +import com.uber.hoodie.hive.HiveSyncConfig; + +/** + * Example showing how to sync the dataset, written by `HoodieClientExample` + */ +public class HoodieHiveSyncExample { + + public static void main(String[] args) { + + HiveSyncConfig cfg = new HiveSyncConfig(); + cfg.databaseName = "default"; + cfg.tableName = "uber_trips"; + cfg.basePath = "/tmp/hoodie/sample-table/"; + cfg.hiveUser = "hive"; + cfg.hivePass = "hive"; + cfg.jdbcUrl = "jdbc:hive2://localhost:10010/"; + + HiveSyncTool.sync(cfg); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java index 5300cb045..b20d6ee99 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/impl/DayBasedPartitionStrategy.java @@ -16,6 +16,8 @@ package com.uber.hoodie.hive.impl; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.hive.HoodieHiveDatasetException; import com.uber.hoodie.hive.PartitionStrategy; import com.uber.hoodie.hive.client.HoodieFSClient; import com.uber.hoodie.hive.model.HoodieDatasetReference; @@ -27,6 +29,9 @@ import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; + /** * Simple day based partitions. * Storage is of this format yyyy/mm/dd @@ -42,8 +47,13 @@ public class DayBasedPartitionStrategy implements PartitionStrategy { this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd"); } - @Override public FileStatus[] scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) { - return fsClient.getDirectoriesMatchingPattern(ref, "/*/*/*"); + @Override public List scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) { + try { + return FSUtils.getAllPartitionPaths(fsClient.getFs(), ref.getBaseDatasetPath()); + } catch (IOException ioe) { + throw new HoodieHiveDatasetException( + "IOException when listing partitions under dataset " + ref , ioe); + } } @Override public String[] getHivePartitionFieldNames() { @@ -51,28 +61,18 @@ public class DayBasedPartitionStrategy implements PartitionStrategy { } @Override - public String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition) { + public String[] convertPartitionToValues(HoodieDatasetReference metadata, String partitionPath) { //yyyy/mm/dd - String basePath = metadata.getBaseDatasetPath(); - String partitionPath = partition.toUri().getPath(); - if (!partitionPath.contains(basePath)) { + String[] splits = partitionPath.split("/"); + if (splits.length != 3) { throw new IllegalArgumentException( - "Partition path " + partitionPath + " is not part of the dataset " + metadata); + "Partition path " + partitionPath + " is not in the form yyyy/mm/dd "); } // Get the partition part and remove the / as well at the end - String partitionPart = partitionPath.substring(basePath.length() + 1); - LOG.info("Extracting parts from " + partitionPart); - int year = extractPart(partitionPart, 0); - int mm = extractPart(partitionPart, 1); - int dd = extractPart(partitionPart, 2); + int year = Integer.parseInt(splits[0]); + int mm = Integer.parseInt(splits[1]); + int dd = Integer.parseInt(splits[2]); DateTime dateTime = new DateTime(year, mm, dd, 0, 0); return new String[] {dtfOut.print(dateTime)}; } - - private int extractPart(String pathString, int index) { - String[] parts = pathString.split("/"); - String part = parts[index]; - return Integer.parseInt(part); - } - } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java index 4558b78c4..c08bbf3b2 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/model/StoragePartition.java @@ -26,13 +26,12 @@ import org.slf4j.LoggerFactory; public class StoragePartition { private static Logger LOG = LoggerFactory.getLogger(StoragePartition.class); private final PartitionStrategy partitionStrategy; - private final Path partitionPath; + private final String partitionPath; private final HoodieDatasetReference metadata; - public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy, - FileStatus input) { + public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy, String partitionPath) { this.metadata = metadata; - this.partitionPath = Path.getPathWithoutSchemeAndAuthority(input.getPath()); + this.partitionPath = partitionPath; this.partitionStrategy = partitionStrategy; } @@ -41,7 +40,8 @@ public class StoragePartition { } public Path getPartitionPath() { - return partitionPath; + return new Path(metadata.getBaseDatasetPath(), partitionPath); + //return Path.getPathWithoutSchemeAndAuthority(new Path(metadata.getBaseDatasetPath(), partitionPath)); } @Override public String toString() {