diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 0a915c12c..fe68af907 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -38,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.parser.HoodieDateTimeParser; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import org.apache.avro.LogicalTypes; @@ -155,6 +156,21 @@ public class DataSourceUtils { } } + /** + * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed. + * @param props + * @param parserClass + * @return + * @throws IOException + */ + public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException { + try { + return (HoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props); + } catch (Throwable e) { + throw new IOException("Could not load date time parser class " + parserClass, e); + } + } + /** * Create a UserDefinedBulkInsertPartitioner class via reflection, *
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index be2d1ef9c..6240f837e 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieKeyException; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -103,7 +104,11 @@ public class CustomKeyGenerator extends KeyGenerator { partitionPath.append(new SimpleKeyGenerator(properties).getPartitionPath(record, partitionPathField)); break; case TIMESTAMP: - partitionPath.append(new TimestampBasedKeyGenerator(properties).getPartitionPath(record, partitionPathField)); + try { + partitionPath.append(new TimestampBasedKeyGenerator(properties).getPartitionPath(record, partitionPathField)); + } catch (IOException ioe) { + throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class"); + } break; default: throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType); diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index c0885139d..7c34ef734 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -21,18 +21,19 @@ package org.apache.hudi.keygen; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieDeltaStreamerException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.parser.HoodieDateTimeParser; +import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import java.io.IOException; import java.io.Serializable; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.TimeZone; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -43,52 +44,54 @@ import static java.util.concurrent.TimeUnit.SECONDS; */ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { - enum TimestampType implements Serializable { + public enum TimestampType implements Serializable { UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR } private final TimeUnit timeUnit; - private final TimestampType timestampType; - - private SimpleDateFormat inputDateFormat; - private final String outputDateFormat; + private DateTimeFormatter inputFormatter; + private final HoodieDateTimeParser parser; // TimeZone detailed settings reference // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html - private final TimeZone timeZone; + private final DateTimeZone outputDateTimeZone; /** * Supported configs. */ - static class Config { + public static class Config { // One value from TimestampType above - private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; - private static final String INPUT_TIME_UNIT = + public static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; + public static final String INPUT_TIME_UNIT = "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit"; - private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = + //This prop can now accept list of input date formats. + public static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat"; - private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = + public static final String TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex"; + public static final String TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.timezone"; + public static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.dateformat"; - private static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = + //still keeping this prop for backward compatibility so that functionality for existing users does not break. + public static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.timezone"; + public static final String TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.timezone"; + static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class"; } - public TimestampBasedKeyGenerator(TypedProperties config) { + public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { super(config); - DataSourceUtils.checkRequiredProperties(config, - Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName()); + this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass); + this.outputDateTimeZone = parser.getOutputDateTimeZone(); + this.outputDateFormat = parser.getOutputDateFormat(); + this.inputFormatter = parser.getInputFormatter(); this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); - this.outputDateFormat = config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); - this.timeZone = TimeZone.getTimeZone(config.getString(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT")); if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { - DataSourceUtils.checkRequiredProperties(config, - Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); - this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); - this.inputDateFormat.setTimeZone(timeZone); + this.inputFormatter = parser.getInputFormatter(); } switch (this.timestampType) { @@ -119,8 +122,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { if (partitionVal == null) { partitionVal = 1L; } - SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat); - partitionPathFormat.setTimeZone(timeZone); + + DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat); + if (this.outputDateTimeZone != null) { + partitionFormatter = partitionFormatter.withZone(outputDateTimeZone); + } try { long timeMs; @@ -131,17 +137,23 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { } else if (partitionVal instanceof Long) { timeMs = convertLongTimeToMillis((Long) partitionVal); } else if (partitionVal instanceof CharSequence) { - timeMs = inputDateFormat.parse(partitionVal.toString()).getTime(); + DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString()); + if (this.outputDateTimeZone == null) { + // Use the timezone that came off the date that was passed in, if it had one + partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone()); + } + + timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis(); } else { throw new HoodieNotSupportedException( "Unexpected type for partition field: " + partitionVal.getClass().getName()); } - Date timestamp = new Date(timeMs); + DateTime timestamp = new DateTime(timeMs, outputDateTimeZone); - return hiveStylePartitioning ? partitionPathField + "=" + partitionPathFormat.format(timestamp) - : partitionPathFormat.format(timestamp); - } catch (ParseException pe) { - throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe); + return hiveStylePartitioning ? partitionPathField + "=" + timestamp.toString(partitionFormatter) + : timestamp.toString(partitionFormatter); + } catch (Exception e) { + throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e); } } @@ -150,7 +162,6 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { // should not be possible throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but scalar it supplied as time value"); } - return MILLISECONDS.convert(partitionVal, timeUnit); } } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java new file mode 100644 index 000000000..355019333 --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.parser; + +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; + +public interface HoodieDateTimeParser { + + /** + * Returns the output date format in which the partition paths will be created for the hudi dataset. + * @return + */ + String getOutputDateFormat(); + + /** + * Returns input formats in which datetime based values might be coming in incoming records. + * @return + */ + DateTimeFormatter getInputFormatter(); + + /** + * Returns the datetime zone one should expect the incoming values into. + * @return + */ + DateTimeZone getInputDateTimeZone(); + + /** + * Returns the datetime zone using which the final partition paths for hudi dataset are created. + * @return + */ + DateTimeZone getOutputDateTimeZone(); +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java new file mode 100644 index 000000000..933e1af17 --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.parser; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.TimeZone; + +public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializable { + + private String configInputDateFormatList; + private final String configInputDateFormatDelimiter; + private final TypedProperties config; + private DateTimeFormatter inputFormatter; + + // TimeZone detailed settings reference + // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html + private final DateTimeZone inputDateTimeZone; + + public HoodieDateTimeParserImpl(TypedProperties config) { + this.config = config; + DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + this.inputDateTimeZone = getInputDateTimeZone(); + this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter(); + + TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); + if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { + DataSourceUtils.checkRequiredProperties(config, + Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); + inputFormatter = getInputDateFormatter(); + } + } + + private String getConfigInputDateFormatDelimiter() { + String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); + inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter; + return inputDateFormatDelimiter; + } + + private DateTimeFormatter getInputDateFormatter() { + if (this.configInputDateFormatList.isEmpty()) { + throw new IllegalArgumentException(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " configuration is required"); + } + + DateTimeFormatter formatter = new DateTimeFormatterBuilder() + .append( + null, + Arrays.stream( + this.configInputDateFormatList.split(this.configInputDateFormatDelimiter)) + .map(String::trim) + .map(DateTimeFormat::forPattern) + .map(DateTimeFormatter::getParser) + .toArray(DateTimeParser[]::new)) + .toFormatter(); + if (this.inputDateTimeZone != null) { + formatter = formatter.withZone(this.inputDateTimeZone); + } else { + formatter = formatter.withOffsetParsed(); + } + + return formatter; + } + + @Override + public String getOutputDateFormat() { + return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + } + + @Override + public DateTimeFormatter getInputFormatter() { + return this.inputFormatter; + } + + @Override + public DateTimeZone getInputDateTimeZone() { + String inputTimeZone; + if (config.containsKey(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { + inputTimeZone = config.getString(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); + } else { + inputTimeZone = config.getString(Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, ""); + } + return !inputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(inputTimeZone)) : null; + } + + @Override + public DateTimeZone getOutputDateTimeZone() { + String outputTimeZone; + if (config.containsKey(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { + outputTimeZone = config.getString(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); + } else { + outputTimeZone = config.getString(Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, ""); + } + return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null; + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index bd8583f2c..b8e0d2921 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -25,6 +25,8 @@ import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.exception.HoodieDeltaStreamerException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,18 +50,41 @@ public class TestTimestampBasedKeyGenerator { } private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) { - properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", timestampType); - properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", dateFormat); - properties.setProperty("hoodie.deltastreamer.keygen.timebased.timezone", timezone); + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat); + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone); if (scalarType != null) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType); } + + return properties; + } + + private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) { + if (timestampType != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + } + if (inputFormatList != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList); + } + if (inputFormatDelimiterRegex != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex); + } + if (inputTimezone != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone); + } + if (outputFormat != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat); + } + if (outputTimezone != null) { + properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone); + } return properties; } @Test - public void testTimestampBasedKeyGenerator() { + public void testTimestampBasedKeyGenerator() throws IOException { // timezone is GMT+8:00 baseRecord.put("createTime", 1578283932000L); properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null); @@ -85,7 +110,7 @@ public class TestTimestampBasedKeyGenerator { } @Test - public void testScalar() { + public void testScalar() throws IOException { // timezone is GMT+8:00 baseRecord.put("createTime", 20000L); @@ -94,4 +119,137 @@ public class TestTimestampBasedKeyGenerator { HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); assertEquals(hk5.getPartitionPath(), "2024-10-04 12"); } -} + + @Test + public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33.428Z"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "GMT"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040113", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_SingleInputFormats_ISO8601WithMsZ_OutputTimezoneAsInputDateTimeZone() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33.428Z"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + ""); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040113", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33.428Z"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "UTC"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040113", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ISO8601NoMsZ_OutputTimezoneAsUTC() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33Z"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "UTC"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040113", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ISO8601NoMsWithOffset_OutputTimezoneAsUTC() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33-05:00"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "UTC"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040118", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsWithOffset_OutputTimezoneAsUTC() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33.123-05:00"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "UTC"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040118", hk1.getPartitionPath()); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsZ_OutputTimezoneAsEST() throws IOException { + baseRecord.put("createTime", "2020-04-01T13:01:33.123Z"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "EST"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("2020040109", hk1.getPartitionPath()); + } + + @Test + public void test_Throws_MultipleInputFormats_InputDateNotMatchingFormats() { + baseRecord.put("createTime", "2020-04-01 13:01:33.123-05:00"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ", + "", + "", + "yyyyMMddHH", + "UTC"); + Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> new TimestampBasedKeyGenerator(properties).getKey(baseRecord)); + } + + @Test + public void test_ExpectsMatch_MultipleInputFormats_ShortDate_OutputCustomDate() throws IOException { + baseRecord.put("createTime", "20200401"); + properties = this.getBaseKeyConfig( + "DATE_STRING", + "yyyy-MM-dd'T'HH:mm:ssZ,yyyy-MM-dd'T'HH:mm:ss.SSSZ,yyyyMMdd", + "", + "UTC", + "MM/dd/yyyy", + "UTC"); + HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord); + + Assertions.assertEquals("04/01/2020", hk1.getPartitionPath()); + } +} \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index fa261d47a..64b0238eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -60,7 +60,6 @@ import org.apache.spark.sql.types.StructType; import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; import java.io.StringReader; import java.nio.ByteBuffer; import java.sql.Connection; @@ -267,12 +266,6 @@ public class UtilHelpers { return -1; } - public static TypedProperties readConfig(InputStream in) throws IOException { - TypedProperties defaults = new TypedProperties(); - defaults.load(in); - return defaults; - } - /** * Returns a factory for creating connections to the given JDBC URL. *