From e555aa516de867a4faf0322e79defa1f52d887ef Mon Sep 17 00:00:00 2001 From: Wenning Ding Date: Wed, 30 Oct 2019 10:41:04 -0700 Subject: [PATCH] [HUDI-353] Add hive style partitioning path --- .../strategy/DayBasedCompactionStrategy.java | 12 +++++ ...lashEncodedDayPartitionValueExtractor.java | 6 +-- .../org/apache/hudi/ComplexKeyGenerator.java | 9 +++- .../org/apache/hudi/SimpleKeyGenerator.java | 7 +++ .../org/apache/hudi/DataSourceOptions.scala | 8 ++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 3 +- .../test/scala/TestDataSourceDefaults.scala | 48 +++++++++++-------- .../keygen/TimestampBasedKeyGenerator.java | 5 +- 8 files changed, 72 insertions(+), 26 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java index 3a1f425b2..0de88bf65 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/DayBasedCompactionStrategy.java @@ -45,6 +45,8 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { // Sorts compaction in LastInFirstCompacted order protected static Comparator comparator = (String leftPartition, String rightPartition) -> { try { + leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition); + rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition); Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(leftPartition); Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(rightPartition); return left.after(right) ? -1 : right.after(left) ? 1 : 0; @@ -77,4 +79,14 @@ public class DayBasedCompactionStrategy extends CompactionStrategy { .collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction()); return filteredPartitionPaths; } + + /** + * If is Hive style partition path, convert it to regular partition path. e.g. year=2019/month=11/day=24 => 2019/11/24 + */ + protected static String getPartitionPathWithoutPartitionKeys(String partitionPath) { + if (partitionPath.contains("=")) { + return partitionPath.replaceFirst(".*?=", "").replaceAll("/.*?=", "/"); + } + return partitionPath; + } } diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java b/hudi-hive/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java index 334bb7932..2ecad289c 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/SlashEncodedDayPartitionValueExtractor.java @@ -54,9 +54,9 @@ public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExt throw new IllegalArgumentException("Partition path " + partitionPath + " is not in the form yyyy/mm/dd "); } // Get the partition part and remove the / as well at the end - int year = Integer.parseInt(splits[0]); - int mm = Integer.parseInt(splits[1]); - int dd = Integer.parseInt(splits[2]); + int year = Integer.parseInt(splits[0].contains("=") ? splits[0].split("=")[1] : splits[0]); + int mm = Integer.parseInt(splits[1].contains("=") ? splits[1].split("=")[1] : splits[1]); + int dd = Integer.parseInt(splits[2].contains("=") ? splits[2].split("=")[1] : splits[2]); DateTime dateTime = new DateTime(year, mm, dd, 0, 0); return Lists.newArrayList(getDtfOut().print(dateTime)); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java index 944194765..a2a70a12b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java @@ -41,11 +41,15 @@ public class ComplexKeyGenerator extends KeyGenerator { protected final List partitionPathFields; + protected final boolean hiveStylePartitioning; + public ComplexKeyGenerator(TypedProperties props) { super(props); this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); this.partitionPathFields = Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")); + this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), + Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL())); } @Override @@ -77,9 +81,10 @@ public class ComplexKeyGenerator extends KeyGenerator { for (String partitionPathField : partitionPathFields) { String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField); if (fieldVal == null || fieldVal.isEmpty()) { - partitionPath.append(DEFAULT_PARTITION_PATH); + partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH + : DEFAULT_PARTITION_PATH); } else { - partitionPath.append(fieldVal); + partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal); } partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java index f45890640..02f41e35b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java @@ -35,10 +35,14 @@ public class SimpleKeyGenerator extends KeyGenerator { protected final String partitionPathField; + protected final boolean hiveStylePartitioning; + public SimpleKeyGenerator(TypedProperties props) { super(props); this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); this.partitionPathField = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()); + this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), + Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL())); } @Override @@ -56,6 +60,9 @@ public class SimpleKeyGenerator extends KeyGenerator { if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = DEFAULT_PARTITION_PATH; } + if (hiveStylePartitioning) { + partitionPath = partitionPathField + "=" + partitionPath; + } return new HoodieKey(recordKey, partitionPath); } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 81d13ada8..b943a0e50 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -138,6 +138,14 @@ object DataSourceWriteOptions { val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field" val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath" + /** + * Flag to indicate whether to use Hive style partitioning. + * If set true, the names of partition folders follow = format. + * By default false (the names of partition folders are only partition values) + */ + val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning" + val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false" + /** * Key generator class, that implements will extract the key out of incoming record * diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index eeb40ca69..a43354382 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -303,7 +303,8 @@ private[hudi] object HoodieSparkSqlWriter { HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL, HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL, HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL, - HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL + HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL, + HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL ) ++: parameters } diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index d4e3f1414..0b550dc8c 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -38,16 +38,17 @@ class TestDataSourceDefaults extends AssertionsForJUnit { } - private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String): TypedProperties = { + private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String, hiveStylePartitioning: String): TypedProperties = { val props = new TypedProperties() props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName) props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionPathField) + props.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, hiveStylePartitioning) props } @Test def testSimpleKeyGenerator() = { // top level, valid fields - val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name")).getKey(baseRecord) + val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false")).getKey(baseRecord) assertEquals("field1", hk1.getRecordKey) assertEquals("name1", hk1.getPartitionPath) @@ -76,14 +77,14 @@ class TestDataSourceDefaults extends AssertionsForJUnit { }; // nested field as record key and partition path - val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin")) + val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false")) .getKey(baseRecord) assertEquals("UserId1@001", hk2.getRecordKey) assertEquals("false", hk2.getPartitionPath) // Nested record key not found try { - new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin")) + new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false")) .getKey(baseRecord) fail("Should have errored out") } catch { @@ -93,21 +94,25 @@ class TestDataSourceDefaults extends AssertionsForJUnit { }; // if partition path can't be found, return default partition path - val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) + val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) .getKey(baseRecord); assertEquals("default", hk3.getPartitionPath) + // if enable hive style partitioning + val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "true")).getKey(baseRecord) + assertEquals("name=name1", hk4.getPartitionPath) + // if partition is null, return default partition path baseRecord.put("name", "") - val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name")) + val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false")) .getKey(baseRecord) - assertEquals("default", hk4.getPartitionPath) + assertEquals("default", hk5.getPartitionPath) // if partition is empty, return default partition path baseRecord.put("name", null) - val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name")) + val hk6 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false")) .getKey(baseRecord) - assertEquals("default", hk5.getPartitionPath) + assertEquals("default", hk6.getPartitionPath) // if record key is empty, throw error try { @@ -138,7 +143,7 @@ class TestDataSourceDefaults extends AssertionsForJUnit { @Test def testComplexKeyGenerator() = { // top level, valid fields - val hk1 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) + val hk1 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) assertEquals("field1:field1,name:name1", hk1.getRecordKey) assertEquals("field1/name1", hk1.getPartitionPath) @@ -167,14 +172,14 @@ class TestDataSourceDefaults extends AssertionsForJUnit { }; // nested field as record key and partition path - val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin")) + val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false")) .getKey(baseRecord) assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", hk2.getRecordKey) assertEquals("UserId1@001/false", hk2.getPartitionPath) // Nested record key not found try { - new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin")) + new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false")) .getKey(baseRecord) fail("Should have errored out") } catch { @@ -184,21 +189,26 @@ class TestDataSourceDefaults extends AssertionsForJUnit { }; // if partition path can't be found, return default partition path - val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere")) + val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) .getKey(baseRecord); assertEquals("default", hk3.getPartitionPath) + // if enable hive style partitioning + val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "true")).getKey(baseRecord) + assertEquals("field1:field1,name:name1", hk4.getRecordKey) + assertEquals("field1=field1/name=name1", hk4.getPartitionPath) + // if one part of the record key is empty, replace with "__empty__" baseRecord.put("name", "") - val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) - assertEquals("field1:field1,name:__empty__", hk4.getRecordKey) - assertEquals("field1/default", hk4.getPartitionPath) + val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) + assertEquals("field1:field1,name:__empty__", hk5.getRecordKey) + assertEquals("field1/default", hk5.getPartitionPath) // if one part of the record key is null, replace with "__null__" baseRecord.put("name", null) - val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord) - assertEquals("field1:field1,name:__null__", hk5.getRecordKey) - assertEquals("field1/default", hk5.getPartitionPath) + val hk6 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord) + assertEquals("field1:field1,name:__null__", hk6.getRecordKey) + assertEquals("field1/default", hk6.getPartitionPath) // if all parts of the composite record key are null/empty, throw error try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java index 140c7097d..4e997549e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java @@ -105,7 +105,10 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); } - return new HoodieKey(recordKey, partitionPathFormat.format(timestamp)); + + String partitionPath = hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp) + : partitionPathFormat.format(timestamp); + return new HoodieKey(recordKey, partitionPath); } catch (ParseException pe) { throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); }