1
0

Adding HiveSyncTool to sync hoodie dataset schema/partitions to Hive

- Designed to be run by your workflow manager after hoodie upsert
 - Assumes jdbc connectivity via HiveServer2, which should work with all major distros
This commit is contained in:
Vinoth Chandar
2017-04-03 14:44:43 -07:00
committed by vinoth chandar
parent 2b6322318c
commit 542d622e49
11 changed files with 313 additions and 114 deletions

View File

@@ -24,12 +24,17 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hoodie-hive</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@@ -63,6 +68,7 @@
<artifactId>libthrift</artifactId>
<version>0.9.2</version>
</dependency>
<!-- Apache commons -->
<dependency>
<groupId>commons-dbcp</groupId>
@@ -79,6 +85,11 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<!-- Hadoop Testing -->
<dependency>
<groupId>junit</groupId>
@@ -136,7 +147,54 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptors>
<descriptor>src/assembly/src.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.uber.hoodie.hive.example.HoodieHiveSyncExample</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/jars</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,44 @@
<!--
~ Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hbase:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,50 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hive;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
/**
* Configs needed to sync data into Hive.
*/
public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--database"}, description = "name of the target database in Hive", required = true)
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
public String tableName;
@Parameter(names = {"--user"}, description = "Hive username", required = true)
public String hiveUser;
@Parameter(names = {"--pass"}, description = "Hive password", required = true)
public String hivePass;
@Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url", required = true)
public String jdbcUrl;
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie dataset to sync", required = true)
public String basePath;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.hive;
import com.beust.jcommander.JCommander;
import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy;
import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy;
import com.uber.hoodie.hive.model.HoodieDatasetReference;
import org.apache.hadoop.conf.Configuration;
/**
* Tool to sync new data from commits, into Hive in terms of
*
* - New table/partitions
* - Updated schema for table/partitions
*/
public class HiveSyncTool {
/**
* Sync to Hive, based on day based partitioning
*
* @param cfg
*/
public static void sync(HiveSyncConfig cfg) {
// Configure to point to which metastore and database to connect to
HoodieHiveConfiguration apiConfig =
HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration())
.hivedb(cfg.databaseName)
.hiveJdbcUrl(cfg.jdbcUrl)
.jdbcUsername(cfg.hiveUser)
.jdbcPassword(cfg.hivePass)
.build();
HoodieDatasetReference datasetReference =
new HoodieDatasetReference(cfg.tableName, cfg.basePath, cfg.databaseName);
// initialize the strategies
PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy();
SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy();
// Creates a new dataset which reflects the state at the time of creation
HoodieHiveDatasetSyncTask datasetSyncTask =
HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference)
.withConfiguration(apiConfig).partitionStrategy(partitionStrategy)
.schemaStrategy(schemaStrategy).build();
// Sync dataset
datasetSyncTask.sync();
}
public static void main(String[] args) throws Exception {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
sync(cfg);
}
}

View File

@@ -25,6 +25,7 @@ import com.uber.hoodie.hive.model.StoragePartition;
import com.uber.hoodie.hive.model.TablePartition;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,11 +152,11 @@ public class HoodieHiveDatasetSyncTask {
.withFSClient(fsClient).build();
List<StoragePartition> storagePartitions = Lists.newArrayList();
FileStatus[] storagePartitionPaths = schemaSyncTask.getPartitionStrategy()
List<String> storagePartitionPaths = schemaSyncTask.getPartitionStrategy()
.scanAllPartitions(schemaSyncTask.getReference(), schemaSyncTask.getFsClient());
for (FileStatus fileStatus : storagePartitionPaths) {
for (String path : storagePartitionPaths) {
storagePartitions.add(new StoragePartition(schemaSyncTask.getReference(),
schemaSyncTask.getPartitionStrategy(), fileStatus));
schemaSyncTask.getPartitionStrategy(), path));
}
LOG.info("Storage partitions scan complete. Found " + storagePartitions.size());

View File

@@ -21,6 +21,8 @@ import com.uber.hoodie.hive.model.HoodieDatasetReference;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.util.List;
/**
* Abstraction to define HDFS partition strategies.
* Strategy provides hookups to map partitions on to physical layout
@@ -29,13 +31,14 @@ import org.apache.hadoop.fs.Path;
*/
public interface PartitionStrategy {
/**
* Scans the file system for all partitions and returns FileStatus[] which are the available partitions
* Scans the file system for all partitions and returns String[] which are the available partitions, relative to
* the base path
*
* @param basePath
* @param fsClient
* @return
*/
FileStatus[] scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient);
List<String> scanAllPartitions(HoodieDatasetReference basePath, HoodieFSClient fsClient);
/**
* Get the list of hive field names the dataset will be partitioned on.
@@ -47,10 +50,10 @@ public interface PartitionStrategy {
/**
* Convert a Partition path (returned in scanAllPartitions) to values for column names returned in getHivePartitionFieldNames
* e.g. /data/topic/2016/12/12/ will return [2016, 12, 12]
* e.g. 2016/12/12/ will return [2016, 12, 12]
*
* @param partition storage path
* @param partitionPath storage path
* @return List of partitions field values
*/
String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition);
String[] convertPartitionToValues(HoodieDatasetReference metadata, String partitionPath);
}

View File

@@ -50,6 +50,7 @@ public class HoodieFSClient {
final public static String PARQUET_EXTENSION_ZIPPED = ".parquet.gz";
private final static Logger LOG = LoggerFactory.getLogger(HoodieFSClient.class);
private final HoodieHiveConfiguration conf;
private final FileSystem fs;
public HoodieFSClient(HoodieHiveConfiguration configuration) {
@@ -123,32 +124,6 @@ public class HoodieFSClient {
}
}
/**
* Finds all the files/directories that match the pattern under the {@link HoodieDatasetReference} basePath
*
* @param metadata
* @param pattern
* @return
*/
public FileStatus[] getDirectoriesMatchingPattern(HoodieDatasetReference metadata, String pattern) {
try {
Path path = new Path(metadata.getBaseDatasetPath() + pattern);
FileStatus[] status = fs.globStatus(path);
List<FileStatus> returns = Lists.newArrayList();
for(FileStatus st:status) {
if(!st.getPath().toString().contains(".distcp")) {
// Ignore temporary directories created by distcp
returns.add(st);
}
}
return returns.toArray(new FileStatus[returns.size()]);
} catch (IOException e) {
throw new HoodieHiveDatasetException(
"IOException when reading directories under dataset " + metadata + " with pattern "
+ pattern, e);
}
}
/**
* Get the list of storage partitions which does not have its equivalent hive partitions
*
@@ -205,4 +180,7 @@ public class HoodieFSClient {
return Objects.hashCode(paths);
}
public FileSystem getFs() {
return fs;
}
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.hive.example;
import com.uber.hoodie.hive.HoodieHiveConfiguration;
import com.uber.hoodie.hive.HoodieHiveDatasetSyncTask;
import com.uber.hoodie.hive.PartitionStrategy;
import com.uber.hoodie.hive.SchemaStrategy;
import com.uber.hoodie.hive.impl.DayBasedPartitionStrategy;
import com.uber.hoodie.hive.impl.ParseSchemaFromDataStrategy;
import com.uber.hoodie.hive.model.HoodieDatasetReference;
import org.apache.hadoop.conf.Configuration;
/**
* Example showing basic usage of Hoodie Hive API
*/
public class HoodieDatasetExample {
public static void main(String[] args) {
// Configure to point to which metastore and database to connect to
HoodieHiveConfiguration apiConfig =
HoodieHiveConfiguration.newBuilder().hadoopConfiguration(new Configuration())
.hivedb("tmp").hiveJdbcUrl("jdbc:hive2://localhost:10010/").jdbcUsername("hive")
.jdbcPassword("hive").build();
HoodieDatasetReference datasetReference =
new HoodieDatasetReference("clickstream", "hdfs:///data/tables/user.clickstream",
"raw");
// initialize the strategies
PartitionStrategy partitionStrategy = new DayBasedPartitionStrategy();
SchemaStrategy schemaStrategy = new ParseSchemaFromDataStrategy();
// Creates a new dataset which reflects the state at the time of creation
HoodieHiveDatasetSyncTask datasetSyncTask =
HoodieHiveDatasetSyncTask.newBuilder().withReference(datasetReference)
.withConfiguration(apiConfig).partitionStrategy(partitionStrategy)
.schemaStrategy(schemaStrategy).build();
// Sync dataset
datasetSyncTask.sync();
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.hive.example;
import com.uber.hoodie.hive.HiveSyncTool;
import com.uber.hoodie.hive.HiveSyncConfig;
/**
* Example showing how to sync the dataset, written by `HoodieClientExample`
*/
public class HoodieHiveSyncExample {
public static void main(String[] args) {
HiveSyncConfig cfg = new HiveSyncConfig();
cfg.databaseName = "default";
cfg.tableName = "uber_trips";
cfg.basePath = "/tmp/hoodie/sample-table/";
cfg.hiveUser = "hive";
cfg.hivePass = "hive";
cfg.jdbcUrl = "jdbc:hive2://localhost:10010/";
HiveSyncTool.sync(cfg);
}
}

View File

@@ -16,6 +16,8 @@
package com.uber.hoodie.hive.impl;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.hive.HoodieHiveDatasetException;
import com.uber.hoodie.hive.PartitionStrategy;
import com.uber.hoodie.hive.client.HoodieFSClient;
import com.uber.hoodie.hive.model.HoodieDatasetReference;
@@ -27,6 +29,9 @@ import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* Simple day based partitions.
* Storage is of this format yyyy/mm/dd
@@ -42,8 +47,13 @@ public class DayBasedPartitionStrategy implements PartitionStrategy {
this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
}
@Override public FileStatus[] scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) {
return fsClient.getDirectoriesMatchingPattern(ref, "/*/*/*");
@Override public List<String> scanAllPartitions(HoodieDatasetReference ref, HoodieFSClient fsClient) {
try {
return FSUtils.getAllPartitionPaths(fsClient.getFs(), ref.getBaseDatasetPath());
} catch (IOException ioe) {
throw new HoodieHiveDatasetException(
"IOException when listing partitions under dataset " + ref , ioe);
}
}
@Override public String[] getHivePartitionFieldNames() {
@@ -51,28 +61,18 @@ public class DayBasedPartitionStrategy implements PartitionStrategy {
}
@Override
public String[] convertPartitionToValues(HoodieDatasetReference metadata, Path partition) {
public String[] convertPartitionToValues(HoodieDatasetReference metadata, String partitionPath) {
//yyyy/mm/dd
String basePath = metadata.getBaseDatasetPath();
String partitionPath = partition.toUri().getPath();
if (!partitionPath.contains(basePath)) {
String[] splits = partitionPath.split("/");
if (splits.length != 3) {
throw new IllegalArgumentException(
"Partition path " + partitionPath + " is not part of the dataset " + metadata);
"Partition path " + partitionPath + " is not in the form yyyy/mm/dd ");
}
// Get the partition part and remove the / as well at the end
String partitionPart = partitionPath.substring(basePath.length() + 1);
LOG.info("Extracting parts from " + partitionPart);
int year = extractPart(partitionPart, 0);
int mm = extractPart(partitionPart, 1);
int dd = extractPart(partitionPart, 2);
int year = Integer.parseInt(splits[0]);
int mm = Integer.parseInt(splits[1]);
int dd = Integer.parseInt(splits[2]);
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
return new String[] {dtfOut.print(dateTime)};
}
private int extractPart(String pathString, int index) {
String[] parts = pathString.split("/");
String part = parts[index];
return Integer.parseInt(part);
}
}

View File

@@ -26,13 +26,12 @@ import org.slf4j.LoggerFactory;
public class StoragePartition {
private static Logger LOG = LoggerFactory.getLogger(StoragePartition.class);
private final PartitionStrategy partitionStrategy;
private final Path partitionPath;
private final String partitionPath;
private final HoodieDatasetReference metadata;
public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy,
FileStatus input) {
public StoragePartition(HoodieDatasetReference metadata, PartitionStrategy partitionStrategy, String partitionPath) {
this.metadata = metadata;
this.partitionPath = Path.getPathWithoutSchemeAndAuthority(input.getPath());
this.partitionPath = partitionPath;
this.partitionStrategy = partitionStrategy;
}
@@ -41,7 +40,8 @@ public class StoragePartition {
}
public Path getPartitionPath() {
return partitionPath;
return new Path(metadata.getBaseDatasetPath(), partitionPath);
//return Path.getPathWithoutSchemeAndAuthority(new Path(metadata.getBaseDatasetPath(), partitionPath));
}
@Override public String toString() {