From f094f4285782e1253b52fef788fa7461c67d95a7 Mon Sep 17 00:00:00 2001 From: Alexander Filipchik Date: Thu, 14 May 2020 13:37:59 -0700 Subject: [PATCH] [HUDI-843] Add ability to specify time unit for TimestampBasedKeyGenerator (#1541) Co-authored-by: Alex Filipchik Co-authored-by: Vinoth Chandar --- .../keygen/TimestampBasedKeyGenerator.java | 47 +++++++++++++++---- .../TestTimestampBasedKeyGenerator.java | 26 ++++++++-- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java index 919a2ef88..e5bdc6456 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java @@ -35,6 +35,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; /** * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. @@ -42,9 +46,11 @@ import java.util.TimeZone; public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { enum TimestampType implements Serializable { - UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS + UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR } + private final TimeUnit timeUnit; + private final TimestampType timestampType; private SimpleDateFormat inputDateFormat; @@ -62,6 +68,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { // One value from TimestampType above private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; + private static final String INPUT_TIME_UNIT = + "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit"; private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat"; private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = @@ -84,6 +92,21 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.inputDateFormat.setTimeZone(timeZone); } + + switch (this.timestampType) { + case EPOCHMILLISECONDS: + timeUnit = MILLISECONDS; + break; + case UNIX_TIMESTAMP: + timeUnit = SECONDS; + break; + case SCALAR: + String timeUnitStr = config.getString(Config.INPUT_TIME_UNIT, TimeUnit.SECONDS.toString()); + timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase()); + break; + default: + timeUnit = null; + } } @Override @@ -96,21 +119,20 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { partitionPathFormat.setTimeZone(timeZone); try { - long unixTime; + long timeMs; if (partitionVal instanceof Double) { - unixTime = ((Double) partitionVal).longValue(); + timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue()); } else if (partitionVal instanceof Float) { - unixTime = ((Float) partitionVal).longValue(); + timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue()); } else if (partitionVal instanceof Long) { - unixTime = (Long) partitionVal; + timeMs = convertLongTimeToMillis((Long) partitionVal); } else if (partitionVal instanceof CharSequence) { - unixTime = inputDateFormat.parse(partitionVal.toString()).getTime() / 1000; + timeMs = inputDateFormat.parse(partitionVal.toString()).getTime(); } else { throw new HoodieNotSupportedException( "Unexpected type for partition field: " + partitionVal.getClass().getName()); } - Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000); - + Date timestamp = new Date(timeMs); String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); @@ -123,4 +145,13 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); } } + + private long convertLongTimeToMillis(Long partitionVal) { + if (timeUnit == null) { + // should not be possible + throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but scalar it supplied as time value"); + } + + return MILLISECONDS.convert(partitionVal, timeUnit); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java index ba090a885..05cb6f9fa 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java @@ -47,10 +47,15 @@ public class TestTimestampBasedKeyGenerator { properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false"); } - private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone) { + private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", timestampType); properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", dateFormat); properties.setProperty("hoodie.deltastreamer.keygen.timebased.timezone", timezone); + + if (scalarType != null) { + properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType); + } + return properties; } @@ -58,25 +63,36 @@ public class TestTimestampBasedKeyGenerator { public void testTimestampBasedKeyGenerator() { // timezone is GMT+8:00 baseRecord.put("createTime", 1578283932000L); - properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00"); + properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null); HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); assertEquals("2020-01-06 12", hk1.getPartitionPath()); // timezone is GMT - properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT"); + properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT", null); HoodieKey hk2 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); assertEquals("2020-01-06 04", hk2.getPartitionPath()); // timestamp is DATE_STRING, timezone is GMT+8:00 baseRecord.put("createTime", "2020-01-06 12:12:12"); - properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00"); + properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00", null); properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss"); HoodieKey hk3 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); assertEquals("2020-01-06 12", hk3.getPartitionPath()); // timezone is GMT - properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT"); + properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT", null); HoodieKey hk4 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); assertEquals("2020-01-06 12", hk4.getPartitionPath()); } + + @Test + public void testScalar() { + // timezone is GMT+8:00 + baseRecord.put("createTime", 20000L); + + // timezone is GMT + properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days"); + HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + assertEquals(hk5.getPartitionPath(), "2024-10-04 12"); + } }