From 35b21855da209c812e006c1afff3d940d5ac2a18 Mon Sep 17 00:00:00 2001 From: Mathieu Date: Sun, 23 Aug 2020 19:56:50 +0800 Subject: [PATCH] [HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920) --- .../java/org/apache/hudi/DataSourceUtils.java | 6 +-- .../hudi/keygen/RowKeyGeneratorHelper.java | 2 +- .../keygen/TimestampBasedKeyGenerator.java | 38 +++++++++++++++--- ...java => AbstractHoodieDateTimeParser.java} | 40 ++++++++++++++----- .../parser/HoodieDateTimeParserImpl.java | 17 ++------ .../TestTimestampBasedKeyGenerator.java | 39 +++++++++++++++++- 6 files changed, 109 insertions(+), 33 deletions(-) rename hudi-spark/src/main/java/org/apache/hudi/keygen/parser/{HoodieDateTimeParser.java => AbstractHoodieDateTimeParser.java} (52%) diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index ea2cc5c0e..19316d5e7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; -import org.apache.hudi.keygen.parser.HoodieDateTimeParser; +import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.LogicalTypes; @@ -172,9 +172,9 @@ public class DataSourceUtils { /** * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed. */ - public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException { + public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException { try { - return (HoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props); + return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props); } catch (Throwable e) { throw new IOException("Could not load date time parser class " + parserClass, e); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java index 02b8492f1..4c05489ce 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java @@ -146,7 +146,7 @@ public class RowKeyGeneratorHelper { } valueToProcess = (Row) valueToProcess.get(positions.get(index)); } else { // last index - if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) { + if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) { toReturn = EMPTY_RECORDKEY_PLACEHOLDER; break; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index 25a52fefd..97a7d2ef6 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.keygen.parser.HoodieDateTimeParser; +import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl; import org.apache.avro.generic.GenericRecord; @@ -41,6 +41,7 @@ import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -63,10 +64,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { private final String outputDateFormat; private transient Option inputFormatter; private transient DateTimeFormatter partitionFormatter; - private final HoodieDateTimeParser parser; + private final AbstractHoodieDateTimeParser parser; // TimeZone detailed settings reference // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html + private final DateTimeZone inputDateTimeZone; private final DateTimeZone outputDateTimeZone; protected final boolean encodePartitionPath; @@ -107,6 +109,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { super(config, recordKeyField, partitionPathField); String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName()); this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass); + this.inputDateTimeZone = parser.getInputDateTimeZone(); this.outputDateTimeZone = parser.getOutputDateTimeZone(); this.outputDateFormat = parser.getOutputDateFormat(); this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); @@ -133,7 +136,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { public String getPartitionPath(GenericRecord record) { Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true); if (partitionVal == null) { - partitionVal = 1L; + partitionVal = getDefaultPartitionVal(); } try { return getPartitionPath(partitionVal); @@ -142,6 +145,31 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { } } + /** + * Set default value to partitionVal if the input value of partitionPathField is null. + */ + private Object getDefaultPartitionVal() { + Object result = 1L; + if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { + // since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP + // configured, here we take the first. + // {Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP} won't be null, it has been checked in the initialization process of + // inputFormatter + String delimiter = parser.getConfigInputDateFormatDelimiter(); + String format = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0]; + + // if both input and output timeZone are not configured, use GMT. + if (null != inputDateTimeZone) { + return new DateTime(result, inputDateTimeZone).toString(format); + } else if (null != outputDateTimeZone) { + return new DateTime(result, outputDateTimeZone).toString(format); + } else { + return new DateTime(result, DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT"))).toString(format); + } + } + return result; + } + /** * The function takes care of lazily initialising dateTimeFormatter variables only once. */ @@ -219,9 +247,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { buildFieldPositionMapIfNeeded(row.schema()); Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0))); try { - if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) + if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { - fieldVal = 1L; + fieldVal = getDefaultPartitionVal(); } else { fieldVal = partitionPathFieldVal; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java similarity index 52% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java rename to hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java index 6612f4cd2..80e26ccb8 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java @@ -17,35 +17,57 @@ package org.apache.hudi.keygen.parser; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; import java.io.Serializable; -public interface HoodieDateTimeParser extends Serializable { +public abstract class AbstractHoodieDateTimeParser implements Serializable { + + protected final TypedProperties config; + protected final String configInputDateFormatDelimiter; + + public AbstractHoodieDateTimeParser(TypedProperties config) { + this.config = config; + this.configInputDateFormatDelimiter = initInputDateFormatDelimiter(); + } + + private String initInputDateFormatDelimiter() { + String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); + inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter; + return inputDateFormatDelimiter; + } /** * Returns the output date format in which the partition paths will be created for the hudi dataset. - * @return */ - String getOutputDateFormat(); + public String getOutputDateFormat() { + return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + } /** * Returns input formats in which datetime based values might be coming in incoming records. - * @return */ - Option getInputFormatter(); + public abstract Option getInputFormatter(); /** * Returns the datetime zone one should expect the incoming values into. - * @return */ - DateTimeZone getInputDateTimeZone(); + public abstract DateTimeZone getInputDateTimeZone(); /** * Returns the datetime zone using which the final partition paths for hudi dataset are created. - * @return */ - DateTimeZone getOutputDateTimeZone(); + public abstract DateTimeZone getOutputDateTimeZone(); + + /** + * Returns the input date format delimiter, comma by default. + */ + public String getConfigInputDateFormatDelimiter() { + return this.configInputDateFormatDelimiter; + } + } diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java index 11790cb42..41452d004 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java +++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java @@ -28,32 +28,22 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatterBuilder; import org.joda.time.format.DateTimeParser; -import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.TimeZone; -public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializable { +public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser { private String configInputDateFormatList; - private final String configInputDateFormatDelimiter; - private final TypedProperties config; // TimeZone detailed settings reference // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html private final DateTimeZone inputDateTimeZone; public HoodieDateTimeParserImpl(TypedProperties config) { - this.config = config; + super(config); DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); this.inputDateTimeZone = getInputDateTimeZone(); - this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter(); - } - - private String getConfigInputDateFormatDelimiter() { - String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); - inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter; - return inputDateFormatDelimiter; } private DateTimeFormatter getInputDateFormatter() { @@ -65,7 +55,7 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa .append( null, Arrays.stream( - this.configInputDateFormatList.split(this.configInputDateFormatDelimiter)) + this.configInputDateFormatList.split(super.configInputDateFormatDelimiter)) .map(String::trim) .map(DateTimeFormat::forPattern) .map(DateTimeFormatter::getParser) @@ -119,4 +109,5 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa } return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null; } + } diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index 6afc6ebb0..78674153e 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -150,6 +150,29 @@ public class TestTimestampBasedKeyGenerator { // test w/ Row assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow)); + + // timezone is GMT+8:00, createTime is null + baseRecord.put("createTime", null); + properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null); + keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey hk5 = keyGen.getKey(baseRecord); + assertEquals("1970-01-01 08", hk5.getPartitionPath()); + + // test w/ Row + baseRow = genericRecordToRow(baseRecord); + assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow)); + + // timestamp is DATE_STRING, timezone is GMT, createTime is null + baseRecord.put("createTime", null); + properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh:mm:ss", "GMT", null); + properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss"); + keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey hk6 = keyGen.getKey(baseRecord); + assertEquals("1970-01-01 12:00:00", hk6.getPartitionPath()); + + // test w/ Row + baseRow = genericRecordToRow(baseRecord); + assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow)); } @Test @@ -160,12 +183,24 @@ public class TestTimestampBasedKeyGenerator { // timezone is GMT properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days"); TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); - HoodieKey hk5 = keyGen.getKey(baseRecord); - assertEquals(hk5.getPartitionPath(), "2024-10-04 12"); + HoodieKey hk1 = keyGen.getKey(baseRecord); + assertEquals(hk1.getPartitionPath(), "2024-10-04 12"); // test w/ Row baseRow = genericRecordToRow(baseRecord); assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow)); + + // timezone is GMT, createTime is null + baseRecord.put("createTime", null); + properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days"); + keyGen = new TimestampBasedKeyGenerator(properties); + HoodieKey hk2 = keyGen.getKey(baseRecord); + assertEquals("1970-01-02 12", hk2.getPartitionPath()); + + // test w/ Row + baseRow = genericRecordToRow(baseRecord); + assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow)); + } @Test