From 460e24e84bad37a51602b0de6b49962e05d18106 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Sun, 16 Sep 2018 08:06:30 -0700 Subject: [PATCH] Hive Sync handling must work for datasets with multi-partition keys --- .../com/uber/hoodie/hive/HiveSyncConfig.java | 14 +++++ .../uber/hoodie/hive/HoodieHiveClient.java | 51 +++++++++---------- .../uber/hoodie/hive/HiveSyncToolTest.java | 31 +++++++++++ .../java/com/uber/hoodie/hive/TestUtil.java | 4 ++ .../util/MultiPartKeysValueExtractor.java | 33 ++++++++++++ 5 files changed, 106 insertions(+), 27 deletions(-) create mode 100644 hoodie-hive/src/test/java/com/uber/hoodie/hive/util/MultiPartKeysValueExtractor.java diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java index 5e81ad9af..d040a5e79 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java @@ -85,4 +85,18 @@ public class HiveSyncConfig implements Serializable { + ", help=" + help + '}'; } + + public static HiveSyncConfig copy(HiveSyncConfig cfg) { + HiveSyncConfig newConfig = new HiveSyncConfig(); + newConfig.basePath = cfg.basePath; + newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; + newConfig.databaseName = cfg.databaseName; + newConfig.hivePass = cfg.hivePass; + newConfig.hiveUser = cfg.hiveUser; + newConfig.partitionFields = cfg.partitionFields; + newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; + newConfig.jdbcUrl = cfg.jdbcUrl; + newConfig.tableName = cfg.tableName; + return newConfig; + } } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index 53c865265..de7c7aed3 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -40,6 +40,7 @@ import java.sql.Driver; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -152,45 +153,41 @@ public class HoodieHiveClient { alterSQL.append(syncConfig.databaseName).append(".").append(syncConfig.tableName) .append(" ADD IF NOT EXISTS "); for (String partition : partitions) { - - StringBuilder partBuilder = new StringBuilder(); - List partitionValues = partitionValueExtractor - .extractPartitionValuesInPath(partition); - Preconditions.checkArgument(syncConfig.partitionFields.size() == partitionValues.size(), - "Partition key parts " + syncConfig.partitionFields - + " does not match with partition values " + partitionValues - + ". Check partition strategy. "); - for (int i = 0; i < syncConfig.partitionFields.size(); i++) { - partBuilder.append(syncConfig.partitionFields.get(i)).append("=").append("'") - .append(partitionValues.get(i)).append("'"); - } - + String partitionClause = getPartitionClause(partition); String fullPartitionPath = new Path(syncConfig.basePath, partition).toString(); - alterSQL.append(" PARTITION (").append(partBuilder.toString()).append(") LOCATION '") + alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '") .append(fullPartitionPath).append("' "); } return alterSQL.toString(); } + /** + * Generate Hive Partition from partition values + * @param partition Partition path + * @return + */ + private String getPartitionClause(String partition) { + List partitionValues = partitionValueExtractor + .extractPartitionValuesInPath(partition); + Preconditions.checkArgument(syncConfig.partitionFields.size() == partitionValues.size(), + "Partition key parts " + syncConfig.partitionFields + + " does not match with partition values " + partitionValues + + ". Check partition strategy. "); + List partBuilder = new ArrayList<>(); + for (int i = 0; i < syncConfig.partitionFields.size(); i++) { + partBuilder.add(syncConfig.partitionFields.get(i) + "=" + "'" + partitionValues.get(i) + "'"); + } + return partBuilder.stream().collect(Collectors.joining(",")); + } + private List constructChangePartitions(List partitions) { List changePartitions = Lists.newArrayList(); String alterTable = "ALTER TABLE " + syncConfig.databaseName + "." + syncConfig.tableName; for (String partition : partitions) { - StringBuilder partBuilder = new StringBuilder(); - List partitionValues = partitionValueExtractor - .extractPartitionValuesInPath(partition); - Preconditions.checkArgument(syncConfig.partitionFields.size() == partitionValues.size(), - "Partition key parts " + syncConfig.partitionFields - + " does not match with partition values " + partitionValues - + ". Check partition strategy. "); - for (int i = 0; i < syncConfig.partitionFields.size(); i++) { - partBuilder.append(syncConfig.partitionFields.get(i)).append("=").append("'") - .append(partitionValues.get(i)).append("'"); - } - + String partitionClause = getPartitionClause(partition); String fullPartitionPath = new Path(syncConfig.basePath, partition).toString(); String changePartition = - alterTable + " PARTITION (" + partBuilder.toString() + ") SET LOCATION '" + fullPartitionPath + "'"; + alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; changePartitions.add(changePartition); } return changePartitions; diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java index 844b6c364..be5a04e5d 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/HiveSyncToolTest.java @@ -22,9 +22,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Lists; import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent; import com.uber.hoodie.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; +import com.uber.hoodie.hive.util.MultiPartKeysValueExtractor; import com.uber.hoodie.hive.util.SchemaUtil; import java.io.IOException; import java.net.URISyntaxException; @@ -356,4 +358,33 @@ public class HiveSyncToolTest { TestUtil.hiveSyncConfig.tableName = roTablename; } + @Test + public void testMultiPartitionKeySync() + throws IOException, InitializationError, URISyntaxException, TException, + InterruptedException { + String commitTime = "100"; + TestUtil.createCOWDataset(commitTime, 5); + + HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig); + hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName(); + hiveSyncConfig.tableName = "multi_part_key"; + hiveSyncConfig.partitionFields = Lists.newArrayList("year", "month", "day"); + TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, + TestUtil.getHiveConf(), TestUtil.fileSystem); + assertFalse("Table " + hiveSyncConfig.tableName + " should not exist initially", + hiveClient.doesTableExist()); + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + assertTrue("Table " + hiveSyncConfig.tableName + " should exist after sync completes", + hiveClient.doesTableExist()); + assertEquals("Hive Schema should match the dataset schema + partition fields", + hiveClient.getTableSchema().size(), hiveClient.getDataSchema().getColumns().size() + 3); + assertEquals("Table partitions should match the number of partitions we wrote", 5, + hiveClient.scanTablePartitions().size()); + assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", + commitTime, hiveClient.getLastCommitTimeSynced().get()); + } } diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index a2a3ab2bb..7093bcf9c 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -358,4 +358,8 @@ public class TestUtil { fsout.write(bytes); fsout.close(); } + + public static Set getCreatedTablesSet() { + return createdTablesSet; + } } diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/MultiPartKeysValueExtractor.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/MultiPartKeysValueExtractor.java new file mode 100644 index 000000000..610126c46 --- /dev/null +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/MultiPartKeysValueExtractor.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive.util; + +import com.uber.hoodie.hive.PartitionValueExtractor; +import java.util.Arrays; +import java.util.List; + +/** + * Partition Key extractor treating each value delimited by slash as separate key. + */ +public class MultiPartKeysValueExtractor implements PartitionValueExtractor { + + @Override + public List extractPartitionValuesInPath(String partitionPath) { + String[] splits = partitionPath.split("/"); + return Arrays.asList(splits); + } +} \ No newline at end of file