[HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987)
Co-authored-by: Bhavani Sudha Saktheeswaran <bhavanisudhas@gmail.com>
This commit is contained in:
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceUtils;
|
|||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
import org.apache.hudi.exception.HoodieDeltaStreamerException;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||||
@@ -40,7 +41,6 @@ import java.io.Serializable;
|
|||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.text.ParseException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
@@ -61,7 +61,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
private final TimeUnit timeUnit;
|
private final TimeUnit timeUnit;
|
||||||
private final TimestampType timestampType;
|
private final TimestampType timestampType;
|
||||||
private final String outputDateFormat;
|
private final String outputDateFormat;
|
||||||
private DateTimeFormatter inputFormatter;
|
private transient Option<DateTimeFormatter> inputFormatter;
|
||||||
|
private transient DateTimeFormatter partitionFormatter;
|
||||||
private final HoodieDateTimeParser parser;
|
private final HoodieDateTimeParser parser;
|
||||||
|
|
||||||
// TimeZone detailed settings reference
|
// TimeZone detailed settings reference
|
||||||
@@ -108,13 +109,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
|
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
|
||||||
this.outputDateTimeZone = parser.getOutputDateTimeZone();
|
this.outputDateTimeZone = parser.getOutputDateTimeZone();
|
||||||
this.outputDateFormat = parser.getOutputDateFormat();
|
this.outputDateFormat = parser.getOutputDateFormat();
|
||||||
this.inputFormatter = parser.getInputFormatter();
|
|
||||||
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
||||||
|
|
||||||
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
|
|
||||||
this.inputFormatter = parser.getInputFormatter();
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (this.timestampType) {
|
switch (this.timestampType) {
|
||||||
case EPOCHMILLISECONDS:
|
case EPOCHMILLISECONDS:
|
||||||
timeUnit = MILLISECONDS;
|
timeUnit = MILLISECONDS;
|
||||||
@@ -146,18 +142,29 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The function takes care of lazily initialising dateTimeFormatter variables only once.
|
||||||
|
*/
|
||||||
|
private void initIfNeeded() {
|
||||||
|
if (this.inputFormatter == null) {
|
||||||
|
this.inputFormatter = parser.getInputFormatter();
|
||||||
|
}
|
||||||
|
if (this.partitionFormatter == null) {
|
||||||
|
this.partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
|
||||||
|
if (this.outputDateTimeZone != null) {
|
||||||
|
partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse and fetch partition path based on data type.
|
* Parse and fetch partition path based on data type.
|
||||||
*
|
*
|
||||||
* @param partitionVal partition path object value fetched from record/row
|
* @param partitionVal partition path object value fetched from record/row
|
||||||
* @return the parsed partition path based on data type
|
* @return the parsed partition path based on data type
|
||||||
* @throws ParseException on any parse exception
|
|
||||||
*/
|
*/
|
||||||
private String getPartitionPath(Object partitionVal) throws ParseException {
|
private String getPartitionPath(Object partitionVal) {
|
||||||
DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
|
initIfNeeded();
|
||||||
if (this.outputDateTimeZone != null) {
|
|
||||||
partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
|
|
||||||
}
|
|
||||||
long timeMs;
|
long timeMs;
|
||||||
if (partitionVal instanceof Double) {
|
if (partitionVal instanceof Double) {
|
||||||
timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
|
timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
|
||||||
@@ -166,13 +173,16 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
} else if (partitionVal instanceof Long) {
|
} else if (partitionVal instanceof Long) {
|
||||||
timeMs = convertLongTimeToMillis((Long) partitionVal);
|
timeMs = convertLongTimeToMillis((Long) partitionVal);
|
||||||
} else if (partitionVal instanceof CharSequence) {
|
} else if (partitionVal instanceof CharSequence) {
|
||||||
DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString());
|
if (!inputFormatter.isPresent()) {
|
||||||
|
throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
|
||||||
|
}
|
||||||
|
DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
|
||||||
if (this.outputDateTimeZone == null) {
|
if (this.outputDateTimeZone == null) {
|
||||||
// Use the timezone that came off the date that was passed in, if it had one
|
// Use the timezone that came off the date that was passed in, if it had one
|
||||||
partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
|
partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
|
||||||
}
|
}
|
||||||
|
|
||||||
timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
|
timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
|
||||||
} else {
|
} else {
|
||||||
throw new HoodieNotSupportedException(
|
throw new HoodieNotSupportedException(
|
||||||
"Unexpected type for partition field: " + partitionVal.getClass().getName());
|
"Unexpected type for partition field: " + partitionVal.getClass().getName());
|
||||||
|
|||||||
@@ -17,10 +17,13 @@
|
|||||||
|
|
||||||
package org.apache.hudi.keygen.parser;
|
package org.apache.hudi.keygen.parser;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
public interface HoodieDateTimeParser {
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public interface HoodieDateTimeParser extends Serializable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the output date format in which the partition paths will be created for the hudi dataset.
|
* Returns the output date format in which the partition paths will be created for the hudi dataset.
|
||||||
@@ -32,7 +35,7 @@ public interface HoodieDateTimeParser {
|
|||||||
* Returns input formats in which datetime based values might be coming in incoming records.
|
* Returns input formats in which datetime based values might be coming in incoming records.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
DateTimeFormatter getInputFormatter();
|
Option<DateTimeFormatter> getInputFormatter();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the datetime zone one should expect the incoming values into.
|
* Returns the datetime zone one should expect the incoming values into.
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package org.apache.hudi.keygen.parser;
|
|||||||
|
|
||||||
import org.apache.hudi.DataSourceUtils;
|
import org.apache.hudi.DataSourceUtils;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
|
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
|
||||||
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
|
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
@@ -37,7 +38,6 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
|
|||||||
private String configInputDateFormatList;
|
private String configInputDateFormatList;
|
||||||
private final String configInputDateFormatDelimiter;
|
private final String configInputDateFormatDelimiter;
|
||||||
private final TypedProperties config;
|
private final TypedProperties config;
|
||||||
private DateTimeFormatter inputFormatter;
|
|
||||||
|
|
||||||
// TimeZone detailed settings reference
|
// TimeZone detailed settings reference
|
||||||
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
|
||||||
@@ -48,14 +48,6 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
|
|||||||
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
|
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
|
||||||
this.inputDateTimeZone = getInputDateTimeZone();
|
this.inputDateTimeZone = getInputDateTimeZone();
|
||||||
this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
|
this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
|
||||||
|
|
||||||
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
|
||||||
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
|
|
||||||
DataSourceUtils.checkRequiredProperties(config,
|
|
||||||
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
|
|
||||||
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
|
|
||||||
inputFormatter = getInputDateFormatter();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getConfigInputDateFormatDelimiter() {
|
private String getConfigInputDateFormatDelimiter() {
|
||||||
@@ -94,8 +86,16 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DateTimeFormatter getInputFormatter() {
|
public Option<DateTimeFormatter> getInputFormatter() {
|
||||||
return this.inputFormatter;
|
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
|
||||||
|
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
|
||||||
|
DataSourceUtils.checkRequiredProperties(config,
|
||||||
|
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
|
||||||
|
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
|
||||||
|
return Option.of(getInputDateFormatter());
|
||||||
|
}
|
||||||
|
|
||||||
|
return Option.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user