[HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
This commit is contained in:
@@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
|
|||||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.keygen.KeyGenerator;
|
import org.apache.hudi.keygen.KeyGenerator;
|
||||||
import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
|
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
|
||||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||||
|
|
||||||
import org.apache.avro.LogicalTypes;
|
import org.apache.avro.LogicalTypes;
|
||||||
@@ -172,9 +172,9 @@ public class DataSourceUtils {
|
|||||||
/**
|
/**
|
||||||
* Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
|
* Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
|
||||||
*/
|
*/
|
||||||
public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
|
public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
|
||||||
try {
|
try {
|
||||||
return (HoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
|
return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new IOException("Could not load date time parser class " + parserClass, e);
|
throw new IOException("Could not load date time parser class " + parserClass, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,7 +146,7 @@ public class RowKeyGeneratorHelper {
|
|||||||
}
|
}
|
||||||
valueToProcess = (Row) valueToProcess.get(positions.get(index));
|
valueToProcess = (Row) valueToProcess.get(positions.get(index));
|
||||||
} else { // last index
|
} else { // last index
|
||||||
if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
|
if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
|
||||||
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
|
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
|
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
|
||||||
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
|
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -41,6 +41,7 @@ import java.io.Serializable;
|
|||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.TimeZone;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
@@ -63,10 +64,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
private final String outputDateFormat;
|
private final String outputDateFormat;
|
||||||
private transient Option<DateTimeFormatter> inputFormatter;
|
private transient Option<DateTimeFormatter> inputFormatter;
|
||||||
private transient DateTimeFormatter partitionFormatter;
|
private transient DateTimeFormatter partitionFormatter;
|
||||||
private final HoodieDateTimeParser parser;
|
private final AbstractHoodieDateTimeParser parser;
|
||||||
|
|
||||||
// TimeZone detailed settings reference
|
// TimeZone detailed settings reference
|
||||||
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
||||||
|
private final DateTimeZone inputDateTimeZone;
|
||||||
private final DateTimeZone outputDateTimeZone;
|
private final DateTimeZone outputDateTimeZone;
|
||||||
|
|
||||||
protected final boolean encodePartitionPath;
|
protected final boolean encodePartitionPath;
|
||||||
@@ -107,6 +109,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
super(config, recordKeyField, partitionPathField);
|
super(config, recordKeyField, partitionPathField);
|
||||||
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
|
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
|
||||||
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
|
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
|
||||||
|
this.inputDateTimeZone = parser.getInputDateTimeZone();
|
||||||
this.outputDateTimeZone = parser.getOutputDateTimeZone();
|
this.outputDateTimeZone = parser.getOutputDateTimeZone();
|
||||||
this.outputDateFormat = parser.getOutputDateFormat();
|
this.outputDateFormat = parser.getOutputDateFormat();
|
||||||
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
||||||
@@ -133,7 +136,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
public String getPartitionPath(GenericRecord record) {
|
public String getPartitionPath(GenericRecord record) {
|
||||||
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
|
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
|
||||||
if (partitionVal == null) {
|
if (partitionVal == null) {
|
||||||
partitionVal = 1L;
|
partitionVal = getDefaultPartitionVal();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return getPartitionPath(partitionVal);
|
return getPartitionPath(partitionVal);
|
||||||
@@ -142,6 +145,31 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set default value to partitionVal if the input value of partitionPathField is null.
|
||||||
|
*/
|
||||||
|
private Object getDefaultPartitionVal() {
|
||||||
|
Object result = 1L;
|
||||||
|
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
|
||||||
|
// since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
|
||||||
|
// configured, here we take the first.
|
||||||
|
// {Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP} won't be null, it has been checked in the initialization process of
|
||||||
|
// inputFormatter
|
||||||
|
String delimiter = parser.getConfigInputDateFormatDelimiter();
|
||||||
|
String format = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0];
|
||||||
|
|
||||||
|
// if both input and output timeZone are not configured, use GMT.
|
||||||
|
if (null != inputDateTimeZone) {
|
||||||
|
return new DateTime(result, inputDateTimeZone).toString(format);
|
||||||
|
} else if (null != outputDateTimeZone) {
|
||||||
|
return new DateTime(result, outputDateTimeZone).toString(format);
|
||||||
|
} else {
|
||||||
|
return new DateTime(result, DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT"))).toString(format);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The function takes care of lazily initialising dateTimeFormatter variables only once.
|
* The function takes care of lazily initialising dateTimeFormatter variables only once.
|
||||||
*/
|
*/
|
||||||
@@ -219,9 +247,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
buildFieldPositionMapIfNeeded(row.schema());
|
buildFieldPositionMapIfNeeded(row.schema());
|
||||||
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
|
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
|
||||||
try {
|
try {
|
||||||
if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|
||||||
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
|
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
|
||||||
fieldVal = 1L;
|
fieldVal = getDefaultPartitionVal();
|
||||||
} else {
|
} else {
|
||||||
fieldVal = partitionPathFieldVal;
|
fieldVal = partitionPathFieldVal;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,35 +17,57 @@
|
|||||||
|
|
||||||
package org.apache.hudi.keygen.parser;
|
package org.apache.hudi.keygen.parser;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public interface HoodieDateTimeParser extends Serializable {
|
public abstract class AbstractHoodieDateTimeParser implements Serializable {
|
||||||
|
|
||||||
|
protected final TypedProperties config;
|
||||||
|
protected final String configInputDateFormatDelimiter;
|
||||||
|
|
||||||
|
public AbstractHoodieDateTimeParser(TypedProperties config) {
|
||||||
|
this.config = config;
|
||||||
|
this.configInputDateFormatDelimiter = initInputDateFormatDelimiter();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String initInputDateFormatDelimiter() {
|
||||||
|
String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
|
||||||
|
inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
|
||||||
|
return inputDateFormatDelimiter;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the output date format in which the partition paths will be created for the hudi dataset.
|
* Returns the output date format in which the partition paths will be created for the hudi dataset.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
String getOutputDateFormat();
|
public String getOutputDateFormat() {
|
||||||
|
return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns input formats in which datetime based values might be coming in incoming records.
|
* Returns input formats in which datetime based values might be coming in incoming records.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
Option<DateTimeFormatter> getInputFormatter();
|
public abstract Option<DateTimeFormatter> getInputFormatter();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the datetime zone one should expect the incoming values into.
|
* Returns the datetime zone one should expect the incoming values into.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
DateTimeZone getInputDateTimeZone();
|
public abstract DateTimeZone getInputDateTimeZone();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the datetime zone using which the final partition paths for hudi dataset are created.
|
* Returns the datetime zone using which the final partition paths for hudi dataset are created.
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
DateTimeZone getOutputDateTimeZone();
|
public abstract DateTimeZone getOutputDateTimeZone();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the input date format delimiter, comma by default.
|
||||||
|
*/
|
||||||
|
public String getConfigInputDateFormatDelimiter() {
|
||||||
|
return this.configInputDateFormatDelimiter;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -28,32 +28,22 @@ import org.joda.time.format.DateTimeFormatter;
|
|||||||
import org.joda.time.format.DateTimeFormatterBuilder;
|
import org.joda.time.format.DateTimeFormatterBuilder;
|
||||||
import org.joda.time.format.DateTimeParser;
|
import org.joda.time.format.DateTimeParser;
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializable {
|
public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
|
||||||
|
|
||||||
private String configInputDateFormatList;
|
private String configInputDateFormatList;
|
||||||
private final String configInputDateFormatDelimiter;
|
|
||||||
private final TypedProperties config;
|
|
||||||
|
|
||||||
// TimeZone detailed settings reference
|
// TimeZone detailed settings reference
|
||||||
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
||||||
private final DateTimeZone inputDateTimeZone;
|
private final DateTimeZone inputDateTimeZone;
|
||||||
|
|
||||||
public HoodieDateTimeParserImpl(TypedProperties config) {
|
public HoodieDateTimeParserImpl(TypedProperties config) {
|
||||||
this.config = config;
|
super(config);
|
||||||
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
|
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
|
||||||
this.inputDateTimeZone = getInputDateTimeZone();
|
this.inputDateTimeZone = getInputDateTimeZone();
|
||||||
this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getConfigInputDateFormatDelimiter() {
|
|
||||||
String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
|
|
||||||
inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
|
|
||||||
return inputDateFormatDelimiter;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private DateTimeFormatter getInputDateFormatter() {
|
private DateTimeFormatter getInputDateFormatter() {
|
||||||
@@ -65,7 +55,7 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
|
|||||||
.append(
|
.append(
|
||||||
null,
|
null,
|
||||||
Arrays.stream(
|
Arrays.stream(
|
||||||
this.configInputDateFormatList.split(this.configInputDateFormatDelimiter))
|
this.configInputDateFormatList.split(super.configInputDateFormatDelimiter))
|
||||||
.map(String::trim)
|
.map(String::trim)
|
||||||
.map(DateTimeFormat::forPattern)
|
.map(DateTimeFormat::forPattern)
|
||||||
.map(DateTimeFormatter::getParser)
|
.map(DateTimeFormatter::getParser)
|
||||||
@@ -119,4 +109,5 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
|
|||||||
}
|
}
|
||||||
return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null;
|
return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -150,6 +150,29 @@ public class TestTimestampBasedKeyGenerator {
|
|||||||
|
|
||||||
// test w/ Row
|
// test w/ Row
|
||||||
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
|
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
|
||||||
|
|
||||||
|
// timezone is GMT+8:00, createTime is null
|
||||||
|
baseRecord.put("createTime", null);
|
||||||
|
properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
|
||||||
|
keyGen = new TimestampBasedKeyGenerator(properties);
|
||||||
|
HoodieKey hk5 = keyGen.getKey(baseRecord);
|
||||||
|
assertEquals("1970-01-01 08", hk5.getPartitionPath());
|
||||||
|
|
||||||
|
// test w/ Row
|
||||||
|
baseRow = genericRecordToRow(baseRecord);
|
||||||
|
assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow));
|
||||||
|
|
||||||
|
// timestamp is DATE_STRING, timezone is GMT, createTime is null
|
||||||
|
baseRecord.put("createTime", null);
|
||||||
|
properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh:mm:ss", "GMT", null);
|
||||||
|
properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
|
||||||
|
keyGen = new TimestampBasedKeyGenerator(properties);
|
||||||
|
HoodieKey hk6 = keyGen.getKey(baseRecord);
|
||||||
|
assertEquals("1970-01-01 12:00:00", hk6.getPartitionPath());
|
||||||
|
|
||||||
|
// test w/ Row
|
||||||
|
baseRow = genericRecordToRow(baseRecord);
|
||||||
|
assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -160,12 +183,24 @@ public class TestTimestampBasedKeyGenerator {
|
|||||||
// timezone is GMT
|
// timezone is GMT
|
||||||
properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
|
properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
|
||||||
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
|
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
|
||||||
HoodieKey hk5 = keyGen.getKey(baseRecord);
|
HoodieKey hk1 = keyGen.getKey(baseRecord);
|
||||||
assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
|
assertEquals(hk1.getPartitionPath(), "2024-10-04 12");
|
||||||
|
|
||||||
// test w/ Row
|
// test w/ Row
|
||||||
baseRow = genericRecordToRow(baseRecord);
|
baseRow = genericRecordToRow(baseRecord);
|
||||||
assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
|
assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
|
||||||
|
|
||||||
|
// timezone is GMT, createTime is null
|
||||||
|
baseRecord.put("createTime", null);
|
||||||
|
properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
|
||||||
|
keyGen = new TimestampBasedKeyGenerator(properties);
|
||||||
|
HoodieKey hk2 = keyGen.getKey(baseRecord);
|
||||||
|
assertEquals("1970-01-02 12", hk2.getPartitionPath());
|
||||||
|
|
||||||
|
// test w/ Row
|
||||||
|
baseRow = genericRecordToRow(baseRecord);
|
||||||
|
assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user