[HUDI-353] Add hive style partitioning path
This commit is contained in:
committed by
Balaji Varadarajan
parent
63e330b17c
commit
e555aa516d
@@ -45,6 +45,8 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
|
|||||||
// Sorts compaction in LastInFirstCompacted order
|
// Sorts compaction in LastInFirstCompacted order
|
||||||
protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> {
|
protected static Comparator<String> comparator = (String leftPartition, String rightPartition) -> {
|
||||||
try {
|
try {
|
||||||
|
leftPartition = getPartitionPathWithoutPartitionKeys(leftPartition);
|
||||||
|
rightPartition = getPartitionPathWithoutPartitionKeys(rightPartition);
|
||||||
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(leftPartition);
|
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(leftPartition);
|
||||||
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(rightPartition);
|
Date right = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH).parse(rightPartition);
|
||||||
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
return left.after(right) ? -1 : right.after(left) ? 1 : 0;
|
||||||
@@ -77,4 +79,14 @@ public class DayBasedCompactionStrategy extends CompactionStrategy {
|
|||||||
.collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction());
|
.collect(Collectors.toList()).subList(0, writeConfig.getTargetPartitionsPerDayBasedCompaction());
|
||||||
return filteredPartitionPaths;
|
return filteredPartitionPaths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If is Hive style partition path, convert it to regular partition path. e.g. year=2019/month=11/day=24 => 2019/11/24
|
||||||
|
*/
|
||||||
|
protected static String getPartitionPathWithoutPartitionKeys(String partitionPath) {
|
||||||
|
if (partitionPath.contains("=")) {
|
||||||
|
return partitionPath.replaceFirst(".*?=", "").replaceAll("/.*?=", "/");
|
||||||
|
}
|
||||||
|
return partitionPath;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -54,9 +54,9 @@ public class SlashEncodedDayPartitionValueExtractor implements PartitionValueExt
|
|||||||
throw new IllegalArgumentException("Partition path " + partitionPath + " is not in the form yyyy/mm/dd ");
|
throw new IllegalArgumentException("Partition path " + partitionPath + " is not in the form yyyy/mm/dd ");
|
||||||
}
|
}
|
||||||
// Get the partition part and remove the / as well at the end
|
// Get the partition part and remove the / as well at the end
|
||||||
int year = Integer.parseInt(splits[0]);
|
int year = Integer.parseInt(splits[0].contains("=") ? splits[0].split("=")[1] : splits[0]);
|
||||||
int mm = Integer.parseInt(splits[1]);
|
int mm = Integer.parseInt(splits[1].contains("=") ? splits[1].split("=")[1] : splits[1]);
|
||||||
int dd = Integer.parseInt(splits[2]);
|
int dd = Integer.parseInt(splits[2].contains("=") ? splits[2].split("=")[1] : splits[2]);
|
||||||
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
|
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
|
||||||
return Lists.newArrayList(getDtfOut().print(dateTime));
|
return Lists.newArrayList(getDtfOut().print(dateTime));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,11 +41,15 @@ public class ComplexKeyGenerator extends KeyGenerator {
|
|||||||
|
|
||||||
protected final List<String> partitionPathFields;
|
protected final List<String> partitionPathFields;
|
||||||
|
|
||||||
|
protected final boolean hiveStylePartitioning;
|
||||||
|
|
||||||
public ComplexKeyGenerator(TypedProperties props) {
|
public ComplexKeyGenerator(TypedProperties props) {
|
||||||
super(props);
|
super(props);
|
||||||
this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
|
this.recordKeyFields = Arrays.asList(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
|
||||||
this.partitionPathFields =
|
this.partitionPathFields =
|
||||||
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","));
|
Arrays.asList(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(","));
|
||||||
|
this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
|
||||||
|
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -77,9 +81,10 @@ public class ComplexKeyGenerator extends KeyGenerator {
|
|||||||
for (String partitionPathField : partitionPathFields) {
|
for (String partitionPathField : partitionPathFields) {
|
||||||
String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField);
|
String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField);
|
||||||
if (fieldVal == null || fieldVal.isEmpty()) {
|
if (fieldVal == null || fieldVal.isEmpty()) {
|
||||||
partitionPath.append(DEFAULT_PARTITION_PATH);
|
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
|
||||||
|
: DEFAULT_PARTITION_PATH);
|
||||||
} else {
|
} else {
|
||||||
partitionPath.append(fieldVal);
|
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + fieldVal : fieldVal);
|
||||||
}
|
}
|
||||||
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
|
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,10 +35,14 @@ public class SimpleKeyGenerator extends KeyGenerator {
|
|||||||
|
|
||||||
protected final String partitionPathField;
|
protected final String partitionPathField;
|
||||||
|
|
||||||
|
protected final boolean hiveStylePartitioning;
|
||||||
|
|
||||||
public SimpleKeyGenerator(TypedProperties props) {
|
public SimpleKeyGenerator(TypedProperties props) {
|
||||||
super(props);
|
super(props);
|
||||||
this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
|
this.recordKeyField = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
|
||||||
this.partitionPathField = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
|
this.partitionPathField = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
|
||||||
|
this.hiveStylePartitioning = props.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
|
||||||
|
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -56,6 +60,9 @@ public class SimpleKeyGenerator extends KeyGenerator {
|
|||||||
if (partitionPath == null || partitionPath.isEmpty()) {
|
if (partitionPath == null || partitionPath.isEmpty()) {
|
||||||
partitionPath = DEFAULT_PARTITION_PATH;
|
partitionPath = DEFAULT_PARTITION_PATH;
|
||||||
}
|
}
|
||||||
|
if (hiveStylePartitioning) {
|
||||||
|
partitionPath = partitionPathField + "=" + partitionPath;
|
||||||
|
}
|
||||||
|
|
||||||
return new HoodieKey(recordKey, partitionPath);
|
return new HoodieKey(recordKey, partitionPath);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -138,6 +138,14 @@ object DataSourceWriteOptions {
|
|||||||
val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"
|
val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"
|
||||||
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath"
|
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag to indicate whether to use Hive style partitioning.
|
||||||
|
* If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
|
||||||
|
* By default false (the names of partition folders are only partition values)
|
||||||
|
*/
|
||||||
|
val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"
|
||||||
|
val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Key generator class, that implements will extract the key out of incoming record
|
* Key generator class, that implements will extract the key out of incoming record
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -303,7 +303,8 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
|
HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
|
||||||
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
|
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
|
||||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
|
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
|
||||||
HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL
|
HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL,
|
||||||
|
HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL
|
||||||
) ++: parameters
|
) ++: parameters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,16 +38,17 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String): TypedProperties = {
|
private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String, hiveStylePartitioning: String): TypedProperties = {
|
||||||
val props = new TypedProperties()
|
val props = new TypedProperties()
|
||||||
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
|
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
|
||||||
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionPathField)
|
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionPathField)
|
||||||
|
props.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, hiveStylePartitioning)
|
||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSimpleKeyGenerator() = {
|
@Test def testSimpleKeyGenerator() = {
|
||||||
// top level, valid fields
|
// top level, valid fields
|
||||||
val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name")).getKey(baseRecord)
|
val hk1 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false")).getKey(baseRecord)
|
||||||
assertEquals("field1", hk1.getRecordKey)
|
assertEquals("field1", hk1.getRecordKey)
|
||||||
assertEquals("name1", hk1.getPartitionPath)
|
assertEquals("name1", hk1.getPartitionPath)
|
||||||
|
|
||||||
@@ -76,14 +77,14 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// nested field as record key and partition path
|
// nested field as record key and partition path
|
||||||
val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin"))
|
val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
assertEquals("UserId1@001", hk2.getRecordKey)
|
assertEquals("UserId1@001", hk2.getRecordKey)
|
||||||
assertEquals("false", hk2.getPartitionPath)
|
assertEquals("false", hk2.getPartitionPath)
|
||||||
|
|
||||||
// Nested record key not found
|
// Nested record key not found
|
||||||
try {
|
try {
|
||||||
new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin"))
|
new SimpleKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
fail("Should have errored out")
|
fail("Should have errored out")
|
||||||
} catch {
|
} catch {
|
||||||
@@ -93,21 +94,25 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// if partition path can't be found, return default partition path
|
// if partition path can't be found, return default partition path
|
||||||
val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere"))
|
val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
|
||||||
.getKey(baseRecord);
|
.getKey(baseRecord);
|
||||||
assertEquals("default", hk3.getPartitionPath)
|
assertEquals("default", hk3.getPartitionPath)
|
||||||
|
|
||||||
|
// if enable hive style partitioning
|
||||||
|
val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "true")).getKey(baseRecord)
|
||||||
|
assertEquals("name=name1", hk4.getPartitionPath)
|
||||||
|
|
||||||
// if partition is null, return default partition path
|
// if partition is null, return default partition path
|
||||||
baseRecord.put("name", "")
|
baseRecord.put("name", "")
|
||||||
val hk4 = new SimpleKeyGenerator(getKeyConfig("field1", "name"))
|
val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
assertEquals("default", hk4.getPartitionPath)
|
assertEquals("default", hk5.getPartitionPath)
|
||||||
|
|
||||||
// if partition is empty, return default partition path
|
// if partition is empty, return default partition path
|
||||||
baseRecord.put("name", null)
|
baseRecord.put("name", null)
|
||||||
val hk5 = new SimpleKeyGenerator(getKeyConfig("field1", "name"))
|
val hk6 = new SimpleKeyGenerator(getKeyConfig("field1", "name", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
assertEquals("default", hk5.getPartitionPath)
|
assertEquals("default", hk6.getPartitionPath)
|
||||||
|
|
||||||
// if record key is empty, throw error
|
// if record key is empty, throw error
|
||||||
try {
|
try {
|
||||||
@@ -138,7 +143,7 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
|
|
||||||
@Test def testComplexKeyGenerator() = {
|
@Test def testComplexKeyGenerator() = {
|
||||||
// top level, valid fields
|
// top level, valid fields
|
||||||
val hk1 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord)
|
val hk1 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord)
|
||||||
assertEquals("field1:field1,name:name1", hk1.getRecordKey)
|
assertEquals("field1:field1,name:name1", hk1.getRecordKey)
|
||||||
assertEquals("field1/name1", hk1.getPartitionPath)
|
assertEquals("field1/name1", hk1.getPartitionPath)
|
||||||
|
|
||||||
@@ -167,14 +172,14 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// nested field as record key and partition path
|
// nested field as record key and partition path
|
||||||
val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin"))
|
val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", hk2.getRecordKey)
|
assertEquals("testNestedRecord.userId:UserId1@001,testNestedRecord.isAdmin:false", hk2.getRecordKey)
|
||||||
assertEquals("UserId1@001/false", hk2.getPartitionPath)
|
assertEquals("UserId1@001/false", hk2.getPartitionPath)
|
||||||
|
|
||||||
// Nested record key not found
|
// Nested record key not found
|
||||||
try {
|
try {
|
||||||
new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin"))
|
new ComplexKeyGenerator(getKeyConfig("testNestedRecord.NotThere", "testNestedRecord.isAdmin", "false"))
|
||||||
.getKey(baseRecord)
|
.getKey(baseRecord)
|
||||||
fail("Should have errored out")
|
fail("Should have errored out")
|
||||||
} catch {
|
} catch {
|
||||||
@@ -184,21 +189,26 @@ class TestDataSourceDefaults extends AssertionsForJUnit {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// if partition path can't be found, return default partition path
|
// if partition path can't be found, return default partition path
|
||||||
val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere"))
|
val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false"))
|
||||||
.getKey(baseRecord);
|
.getKey(baseRecord);
|
||||||
assertEquals("default", hk3.getPartitionPath)
|
assertEquals("default", hk3.getPartitionPath)
|
||||||
|
|
||||||
|
// if enable hive style partitioning
|
||||||
|
val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "true")).getKey(baseRecord)
|
||||||
|
assertEquals("field1:field1,name:name1", hk4.getRecordKey)
|
||||||
|
assertEquals("field1=field1/name=name1", hk4.getPartitionPath)
|
||||||
|
|
||||||
// if one part of the record key is empty, replace with "__empty__"
|
// if one part of the record key is empty, replace with "__empty__"
|
||||||
baseRecord.put("name", "")
|
baseRecord.put("name", "")
|
||||||
val hk4 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord)
|
val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord)
|
||||||
assertEquals("field1:field1,name:__empty__", hk4.getRecordKey)
|
assertEquals("field1:field1,name:__empty__", hk5.getRecordKey)
|
||||||
assertEquals("field1/default", hk4.getPartitionPath)
|
assertEquals("field1/default", hk5.getPartitionPath)
|
||||||
|
|
||||||
// if one part of the record key is null, replace with "__null__"
|
// if one part of the record key is null, replace with "__null__"
|
||||||
baseRecord.put("name", null)
|
baseRecord.put("name", null)
|
||||||
val hk5 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name")).getKey(baseRecord)
|
val hk6 = new ComplexKeyGenerator(getKeyConfig("field1,name", "field1,name", "false")).getKey(baseRecord)
|
||||||
assertEquals("field1:field1,name:__null__", hk5.getRecordKey)
|
assertEquals("field1:field1,name:__null__", hk6.getRecordKey)
|
||||||
assertEquals("field1/default", hk5.getPartitionPath)
|
assertEquals("field1/default", hk6.getPartitionPath)
|
||||||
|
|
||||||
// if all parts of the composite record key are null/empty, throw error
|
// if all parts of the composite record key are null/empty, throw error
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -105,7 +105,10 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
if (recordKey == null || recordKey.isEmpty()) {
|
if (recordKey == null || recordKey.isEmpty()) {
|
||||||
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
||||||
}
|
}
|
||||||
return new HoodieKey(recordKey, partitionPathFormat.format(timestamp));
|
|
||||||
|
String partitionPath = hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp)
|
||||||
|
: partitionPathFormat.format(timestamp);
|
||||||
|
return new HoodieKey(recordKey, partitionPath);
|
||||||
} catch (ParseException pe) {
|
} catch (ParseException pe) {
|
||||||
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
|
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user