diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index c2688b8f0..c82f8a2f2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -68,7 +68,7 @@ public class HoodieCreateHandle extends HoodieIOH try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, - new Path(config.getBasePath()), new Path(config.getBasePath(), partitionPath)); + new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(TaskContext.getPartitionId()); this.storageWriter = HoodieStorageWriterFactory .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 3e3318161..07bc368e4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -85,7 +85,7 @@ public abstract class HoodieIOHandle { } public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) { - Path path = new Path(config.getBasePath(), partitionPath); + Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath); try { fs.mkdirs(path); // create a new partition as needed. } catch (IOException e) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 580975f92..bf575d416 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -101,12 +101,12 @@ public class HoodieMergeHandle extends HoodieIOHa writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, - new Path(config.getBasePath()), new Path(config.getBasePath(), partitionPath)); + new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(TaskContext.getPartitionId()); oldFilePath = new Path( config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); - String relativePath = new Path(partitionPath + "/" + FSUtils + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); if (config.shouldUseTempFolderForCopyOnWriteForMerge()) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 7e7792d3d..91520b725 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -426,7 +426,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, try { // Create the path if it does not exist already - Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr); + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr); FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath); FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath); List fileGroups = addFilesToView(statuses); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index f71f6b4cc..ba9cef9ed 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -147,7 +147,9 @@ public class FSUtils { public static String getRelativePartitionPath(Path basePath, Path partitionPath) { String partitionFullPath = partitionPath.toString(); int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()); - return partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1); + // Partition-Path could be empty for non-partitioned tables + return partitionStartIndex + basePath.getName().length() == partitionFullPath.length() ? "" : + partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1); } /** @@ -396,4 +398,10 @@ public class FSUtils { public static Long getSizeInMB(long sizeInBytes) { return sizeInBytes / (1024 * 1024); } + + public static Path getPartitionPath(String basePath, String partitionPath) { + // FOr non-partitioned table, return only base-path + return ((partitionPath == null) || (partitionPath.isEmpty())) ? new Path(basePath) : + new Path(basePath, partitionPath); + } } diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java index d040a5e79..14b01ffec 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncConfig.java @@ -49,7 +49,7 @@ public class HiveSyncConfig implements Serializable { public String basePath; @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by", - required = true) + required = false) public List partitionFields = new ArrayList<>(); @Parameter(names = "-partition-value-extractor", description = "Class which implements " diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index de7c7aed3..e3f9cb0fe 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -154,7 +154,7 @@ public class HoodieHiveClient { .append(" ADD IF NOT EXISTS "); for (String partition : partitions) { String partitionClause = getPartitionClause(partition); - String fullPartitionPath = new Path(syncConfig.basePath, partition).toString(); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '") .append(fullPartitionPath).append("' "); } @@ -185,7 +185,7 @@ public class HoodieHiveClient { String alterTable = "ALTER TABLE " + syncConfig.databaseName + "." + syncConfig.tableName; for (String partition : partitions) { String partitionClause = getPartitionClause(partition); - String fullPartitionPath = new Path(syncConfig.basePath, partition).toString(); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); String changePartition = alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; changePartitions.add(changePartition); @@ -210,16 +210,18 @@ public class HoodieHiveClient { List events = Lists.newArrayList(); for (String storagePartition : partitionStoragePartitions) { - String fullStoragePartitionPath = new Path(syncConfig.basePath, storagePartition).toString(); + String fullStoragePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition).toString(); // Check if the partition values or if hdfs path is the same List storagePartitionValues = partitionValueExtractor .extractPartitionValuesInPath(storagePartition); Collections.sort(storagePartitionValues); - String storageValue = String.join(", ", storagePartitionValues); - if (!paths.containsKey(storageValue)) { - events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); - } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { - events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } } } return events; diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/NonPartitionedExtractor.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/NonPartitionedExtractor.java new file mode 100644 index 000000000..ce610360d --- /dev/null +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/NonPartitionedExtractor.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hive; + +import java.util.ArrayList; +import java.util.List; + +/** + * Extractor for Non-partitioned hive tables + */ +public class NonPartitionedExtractor implements PartitionValueExtractor { + + @Override + public List extractPartitionValuesInPath(String partitionPath) { + return new ArrayList<>(); + } +} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java index 00337c5b9..6ed51b732 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/util/SchemaUtil.java @@ -406,8 +406,9 @@ public class SchemaUtil { List partitionFields = new ArrayList<>(); for (String partitionKey : config.partitionFields) { + String partitionKeyWithTicks = tickSurround(partitionKey); partitionFields.add(new StringBuilder().append(partitionKey).append(" ") - .append(getPartitionKeyType(hiveSchema, partitionKey)).toString()); + .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); } String partitionsStr = partitionFields.stream().collect(Collectors.joining(",")); diff --git a/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieSanity.java b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieSanity.java index 9a1b694ff..6dc48515f 100644 --- a/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieSanity.java +++ b/hoodie-integ-test/src/test/java/com/uber/hoodie/integ/ITTestHoodieSanity.java @@ -25,6 +25,12 @@ import org.junit.Test; */ public class ITTestHoodieSanity extends ITTestBase { + enum PartitionType { + SINGLE_KEY_PARTITIONED, + MULTI_KEYS_PARTITIONED, + NON_PARTITIONED, + } + @Test public void testRunEcho() throws Exception { String[] cmd = new String[]{"echo", "Happy Testing"}; @@ -44,7 +50,7 @@ public class ITTestHoodieSanity extends ITTestBase { */ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception { String hiveTableName = "docker_hoodie_single_partition_key_cow_test"; - testRunHoodieJavaAppOnCOWTable(hiveTableName, true); + testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.SINGLE_KEY_PARTITIONED); } @Test @@ -55,7 +61,18 @@ public class ITTestHoodieSanity extends ITTestBase { */ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception { String hiveTableName = "docker_hoodie_multi_partition_key_cow_test"; - testRunHoodieJavaAppOnCOWTable(hiveTableName, false); + testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.MULTI_KEYS_PARTITIONED); + } + + @Test + /** + * A basic integration test that runs HoodieJavaApp to create a sample non-partitioned COW Hoodie + * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count + * query in hive console. + */ + public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception { + String hiveTableName = "docker_hoodie_non_partition_key_cow_test"; + testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.NON_PARTITIONED); } /** @@ -64,7 +81,7 @@ public class ITTestHoodieSanity extends ITTestBase { * query in hive console. * TODO: Add spark-shell test-case */ - public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, boolean singlePartitionKey) throws Exception { + public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType partitionType) throws Exception { String hdfsPath = "/" + hiveTableName; String hdfsUrl = "hdfs://namenode" + hdfsPath; @@ -90,7 +107,7 @@ public class ITTestHoodieSanity extends ITTestBase { // Run Hoodie Java App { String[] cmd = null; - if (singlePartitionKey) { + if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) { cmd = new String[]{ HOODIE_JAVA_APP, "--hive-sync", @@ -98,13 +115,22 @@ public class ITTestHoodieSanity extends ITTestBase { "--hive-url", HIVE_SERVER_JDBC_URL, "--hive-table", hiveTableName }; + } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { + cmd = new String[]{ + HOODIE_JAVA_APP, + "--hive-sync", + "--table-path", hdfsUrl, + "--hive-url", HIVE_SERVER_JDBC_URL, + "--use-multi-partition-keys", + "--hive-table", hiveTableName + }; } else { cmd = new String[]{ HOODIE_JAVA_APP, "--hive-sync", "--table-path", hdfsUrl, "--hive-url", HIVE_SERVER_JDBC_URL, - "--use-multi-partition-keys", + "--non-partitioned", "--hive-table", hiveTableName }; } diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/NonpartitionedKeyGenerator.java b/hoodie-spark/src/main/java/com/uber/hoodie/NonpartitionedKeyGenerator.java new file mode 100644 index 000000000..59ca5d453 --- /dev/null +++ b/hoodie-spark/src/main/java/com/uber/hoodie/NonpartitionedKeyGenerator.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie; + +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.util.TypedProperties; +import org.apache.avro.generic.GenericRecord; + +/** + * Simple Key generator for unpartitioned Hive Tables + */ +public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { + + private static final String EMPTY_PARTITION = ""; + + public NonpartitionedKeyGenerator(TypedProperties props) { + super(props); + } + + @Override + public HoodieKey getKey(GenericRecord record) { + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField); + return new HoodieKey(recordKey, EMPTY_PARTITION); + } +} diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index 93ad54878..fe3507e08 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -258,9 +258,8 @@ private[hoodie] object HoodieSparkSqlWriter { hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) hiveSyncConfig.partitionFields = - ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).toList: _*) + ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*) hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig } - } \ No newline at end of file diff --git a/hoodie-spark/src/test/java/HoodieJavaApp.java b/hoodie-spark/src/test/java/HoodieJavaApp.java index 3cdc55136..f6f17f73f 100644 --- a/hoodie-spark/src/test/java/HoodieJavaApp.java +++ b/hoodie-spark/src/test/java/HoodieJavaApp.java @@ -21,10 +21,13 @@ import com.beust.jcommander.Parameter; import com.uber.hoodie.DataSourceReadOptions; import com.uber.hoodie.DataSourceWriteOptions; import com.uber.hoodie.HoodieDataSourceHelpers; +import com.uber.hoodie.NonpartitionedKeyGenerator; +import com.uber.hoodie.SimpleKeyGenerator; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.hive.MultiPartKeysValueExtractor; +import com.uber.hoodie.hive.NonPartitionedExtractor; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -68,6 +71,9 @@ public class HoodieJavaApp { @Parameter(names = {"--hive-url", "-hl"}, description = "hive JDBC URL") private String hiveJdbcUrl = "jdbc:hive2://localhost:10000"; + @Parameter(names = {"--non-partitioned", "-np"}, description = "Use non-partitioned Table") + private Boolean nonPartitionedTable = false; + @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys") private Boolean useMultiPartitionKeys = false; @@ -98,7 +104,13 @@ public class HoodieJavaApp { FileSystem fs = FileSystem.get(jssc.hadoopConfiguration()); // Generator of some records to be loaded in. - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + HoodieTestDataGenerator dataGen = null; + if (nonPartitionedTable) { + // All data goes to base-path + dataGen = new HoodieTestDataGenerator(new String[]{""}); + } else { + dataGen = new HoodieTestDataGenerator(); + } /** * Commit with only inserts @@ -124,6 +136,9 @@ public class HoodieJavaApp { .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") // use to combine duplicate records in input/with disk val .option(HoodieWriteConfig.TABLE_NAME, tableName) // Used by hive sync and queries + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), + nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : + SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .mode( SaveMode.Overwrite); // This will remove any existing data at path below, and create a @@ -145,7 +160,11 @@ public class HoodieJavaApp { .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), + nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : + SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append); + updateHiveSyncConfig(writer); writer.save(tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); @@ -157,7 +176,7 @@ public class HoodieJavaApp { Dataset hoodieROViewDF = spark.read().format("com.uber.hoodie") // pass any path glob, can include hoodie & non-hoodie // datasets - .load(tablePath + "/*/*/*/*"); + .load(tablePath + (nonPartitionedTable ? "/*" : "/*/*/*/*")); hoodieROViewDF.registerTempTable("hoodie_ro"); spark.sql("describe hoodie_ro").show(); // all trips whose fare was greater than 2. @@ -195,7 +214,11 @@ public class HoodieJavaApp { .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser) .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true"); - if (useMultiPartitionKeys) { + if (nonPartitionedTable) { + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), + NonPartitionedExtractor.class.getCanonicalName()) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), ""); + } else if (useMultiPartitionKeys) { writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getCanonicalName());