diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index 3f51be4e3..bce7e24c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -65,29 +65,6 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { protected final boolean encodePartitionPath; - /** - * Supported configs. - */ - public static class Config { - - // One value from TimestampType above - public static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; - public static final String INPUT_TIME_UNIT = - "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit"; - //This prop can now accept list of input date formats. - public static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = - "hoodie.deltastreamer.keygen.timebased.input.dateformat"; - public static final String TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex"; - public static final String TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.timezone"; - public static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = - "hoodie.deltastreamer.keygen.timebased.output.dateformat"; - //still keeping this prop for backward compatibility so that functionality for existing users does not break. - public static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = - "hoodie.deltastreamer.keygen.timebased.timezone"; - public static final String TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.timezone"; - static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class"; - } - public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException { this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); @@ -99,12 +76,12 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException { super(config, recordKeyField, partitionPathField); - String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParser.class.getName()); + String dateTimeParserClass = config.getString(KeyGeneratorOptions.Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParser.class.getName()); this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass); this.inputDateTimeZone = parser.getInputDateTimeZone(); this.outputDateTimeZone = parser.getOutputDateTimeZone(); this.outputDateFormat = parser.getOutputDateFormat(); - this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); + this.timestampType = TimestampType.valueOf(config.getString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP)); switch (this.timestampType) { case EPOCHMILLISECONDS: @@ -114,7 +91,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { timeUnit = SECONDS; break; case SCALAR: - String timeUnitStr = config.getString(Config.INPUT_TIME_UNIT, TimeUnit.SECONDS.toString()); + String timeUnitStr = config.getString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.SECONDS.toString()); timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase()); break; default: @@ -148,7 +125,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { // {Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP} won't be null, it has been checked in the initialization process of // inputFormatter String delimiter = parser.getConfigInputDateFormatDelimiter(); - String format = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0]; + String format = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0]; // if both input and output timeZone are not configured, use GMT. if (null != inputDateTimeZone) { @@ -200,7 +177,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { timeMs = convertLongTimeToMillis(((BigDecimal) partitionVal).longValue()); } else if (partitionVal instanceof CharSequence) { if (!inputFormatter.isPresent()) { - throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); + throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); } DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString()); if (this.outputDateTimeZone == null) { @@ -224,7 +201,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { private long convertLongTimeToMillis(Long partitionVal) { if (timeUnit == null) { // should not be possible - throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but scalar it supplied as time value"); + throw new RuntimeException(KeyGeneratorOptions.Config.INPUT_TIME_UNIT + " is not specified but scalar it supplied as time value"); } return MILLISECONDS.convert(partitionVal, timeUnit); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/BaseHoodieDateTimeParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/BaseHoodieDateTimeParser.java index d45f97447..74c62fc63 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/BaseHoodieDateTimeParser.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/BaseHoodieDateTimeParser.java @@ -19,7 +19,7 @@ package org.apache.hudi.keygen.parser; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; @@ -36,7 +36,7 @@ public abstract class BaseHoodieDateTimeParser implements Serializable { } private String initInputDateFormatDelimiter() { - String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); + String inputDateFormatDelimiter = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter; return inputDateFormatDelimiter; } @@ -45,7 +45,7 @@ public abstract class BaseHoodieDateTimeParser implements Serializable { * Returns the output date format in which the partition paths will be created for the hudi dataset. */ public String getOutputDateFormat() { - return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + return config.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java index 64f79f0e6..c15d484df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java @@ -20,8 +20,8 @@ package org.apache.hudi.keygen.parser; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType; -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config; import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -42,13 +42,13 @@ public class HoodieDateTimeParser extends BaseHoodieDateTimeParser { public HoodieDateTimeParser(TypedProperties config) { super(config); - KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + KeyGenUtils.checkRequiredProperties(config, Arrays.asList(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); this.inputDateTimeZone = getInputDateTimeZone(); } private DateTimeFormatter getInputDateFormatter() { if (this.configInputDateFormatList.isEmpty()) { - throw new IllegalArgumentException(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " configuration is required"); + throw new IllegalArgumentException(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " configuration is required"); } DateTimeFormatter formatter = new DateTimeFormatterBuilder() @@ -72,16 +72,16 @@ public class HoodieDateTimeParser extends BaseHoodieDateTimeParser { @Override public String getOutputDateFormat() { - return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + return config.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); } @Override public Option getInputFormatter() { - TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); + TimestampType timestampType = TimestampType.valueOf(config.getString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP)); if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { KeyGenUtils.checkRequiredProperties(config, - Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); - this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); + Collections.singletonList(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + this.configInputDateFormatList = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); return Option.of(getInputDateFormatter()); } @@ -91,10 +91,10 @@ public class HoodieDateTimeParser extends BaseHoodieDateTimeParser { @Override public DateTimeZone getInputDateTimeZone() { String inputTimeZone; - if (config.containsKey(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { - inputTimeZone = config.getString(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); + if (config.containsKey(KeyGeneratorOptions.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { + inputTimeZone = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); } else { - inputTimeZone = config.getString(Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, ""); + inputTimeZone = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, ""); } return !inputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(inputTimeZone)) : null; } @@ -102,10 +102,10 @@ public class HoodieDateTimeParser extends BaseHoodieDateTimeParser { @Override public DateTimeZone getOutputDateTimeZone() { String outputTimeZone; - if (config.containsKey(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { - outputTimeZone = config.getString(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); + if (config.containsKey(KeyGeneratorOptions.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP)) { + outputTimeZone = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT"); } else { - outputTimeZone = config.getString(Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, ""); + outputTimeZone = config.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, ""); } return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index 6f3c1a39f..d6d0879cf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -70,9 +70,9 @@ public class TestTimestampBasedKeyGenerator { } private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat); - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone); if (scalarType != null) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType); @@ -94,22 +94,22 @@ public class TestTimestampBasedKeyGenerator { private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) { if (timestampType != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); } if (inputFormatList != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList); } if (inputFormatDelimiterRegex != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex); } if (inputTimezone != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone); } if (outputFormat != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat); } if (outputTimezone != null) { - properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone); + properties.setProperty(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone); } return properties; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 97a66e2c4..b26de60f2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -19,10 +19,12 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; + import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex; import org.apache.hudi.common.config.ConfigClassProperty; @@ -41,12 +43,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -182,6 +187,14 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; + public static final List PERSISTED_CONFIG_LIST = Arrays.asList( + Config.DATE_TIME_PARSER_PROP, + Config.INPUT_TIME_UNIT, Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, + Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, + Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, Config.DATE_TIME_PARSER_PROP + ); + public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index b9a367396..5ad3b329a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; @@ -643,6 +644,12 @@ public class HoodieTableMetaClient implements Serializable { private Boolean urlEncodePartitioning; private HoodieTimelineTimeZone commitTimeZone; + /** + * Persist the configs that is written at the first time, and should not be changed. + * Like KeyGenerator's configs. + */ + private Properties others = new Properties(); + private PropertyBuilder() { } @@ -750,6 +757,23 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder set(String key, Object value) { + if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { + this.others.put(key, value); + } + return this; + } + + public PropertyBuilder set(Map props) { + for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) { + Object value = props.get(key); + if (value != null) { + set(key, value); + } + } + return this; + } + public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) { return setTableType(metaClient.getTableType()) .setTableName(metaClient.getTableConfig().getTableName()) @@ -759,6 +783,14 @@ public class HoodieTableMetaClient implements Serializable { public PropertyBuilder fromProperties(Properties properties) { HoodieConfig hoodieConfig = new HoodieConfig(properties); + + for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) { + Object value = hoodieConfig.getString(key); + if (value != null) { + set(key, value); + } + } + if (hoodieConfig.contains(HoodieTableConfig.DATABASE_NAME)) { setDatabaseName(hoodieConfig.getString(HoodieTableConfig.DATABASE_NAME)); } @@ -828,6 +860,9 @@ public class HoodieTableMetaClient implements Serializable { ValidationUtils.checkArgument(tableName != null, "tableName is null"); HoodieTableConfig tableConfig = new HoodieTableConfig(); + + tableConfig.setAll(others); + if (databaseName != null) { tableConfig.setValue(HoodieTableConfig.DATABASE_NAME, databaseName); } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index 6a1f76121..ff182c4c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -96,5 +96,28 @@ public class KeyGeneratorOptions extends HoodieConfig { */ @Deprecated public static final String PARTITIONPATH_FIELD_OPT_KEY = PARTITIONPATH_FIELD_NAME.key(); + + /** + * Supported configs. + */ + public static class Config { + + // One value from TimestampType above + public static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type"; + public static final String INPUT_TIME_UNIT = + "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit"; + //This prop can now accept list of input date formats. + public static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = + "hoodie.deltastreamer.keygen.timebased.input.dateformat"; + public static final String TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex"; + public static final String TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.timezone"; + public static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = + "hoodie.deltastreamer.keygen.timebased.output.dateformat"; + //still keeping this prop for backward compatibility so that functionality for existing users does not break. + public static final String TIMESTAMP_TIMEZONE_FORMAT_PROP = + "hoodie.deltastreamer.keygen.timebased.timezone"; + public static final String TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.timezone"; + public static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class"; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 5299551fc..b11c44af4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; @@ -238,22 +239,22 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab int precision = DataTypeUtils.precision(fieldType.getLogicalType()); if (precision == 0) { // seconds - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name()); } else if (precision == 3) { // milliseconds - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name()); } String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR); - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); } else { - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name()); String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY); - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); } - conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC"); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC"); } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index cbdffe360..6d8be2153 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -27,6 +27,7 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.SchemaBuilder; import org.apache.hudi.utils.TestConfigurations; @@ -419,11 +420,11 @@ public class TestHoodieTableFactory { final Configuration conf1 = tableSource1.getConf(); assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is(TimestampBasedAvroKeyGenerator.class.getName())); - assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, "dummy"), + assertThat(conf1.getString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, "dummy"), is("EPOCHMILLISECONDS")); - assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "dummy"), + assertThat(conf1.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "dummy"), is(FlinkOptions.PARTITION_FORMAT_HOUR)); - assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "dummy"), + assertThat(conf1.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "dummy"), is("UTC")); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 67f110801..9cdf5cc63 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,20 +18,30 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} + import org.apache.hudi.HoodieFileIndex.getConfigProperties import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} + +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} +import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import java.text.SimpleDateFormat /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -102,6 +112,10 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of PartitionDirectory containing partition to base files mapping */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + + val convertedPartitionFilters = + HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) + // Look up candidate files names in the col-stats index, if all of the following conditions are true // - Data-skipping is enabled // - Col-Stats Index is present @@ -135,7 +149,7 @@ case class HoodieFileIndex(spark: SparkSession, Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters) var totalFileSize = 0 var candidateFileSize = 0 @@ -266,7 +280,7 @@ case class HoodieFileIndex(spark: SparkSession, } } -object HoodieFileIndex { +object HoodieFileIndex extends Logging { def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { val sqlConf: SQLConf = spark.sessionState.conf @@ -281,6 +295,41 @@ object HoodieFileIndex { properties } + def convertFilterForTimestampKeyGenerator(metaClient: HoodieTableMetaClient, + partitionFilters: Seq[Expression]): Seq[Expression] = { + + val tableConfig = metaClient.getTableConfig + val keyGenerator = tableConfig.getKeyGeneratorClassName + + if (keyGenerator != null && (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) || + keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName))) { + val inputFormat = tableConfig.getString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP) + val outputFormat = tableConfig.getString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP) + if (StringUtils.isNullOrEmpty(inputFormat) || StringUtils.isNullOrEmpty(outputFormat) || + inputFormat.equals(outputFormat)) { + partitionFilters + } else { + try { + val inDateFormat = new SimpleDateFormat(inputFormat) + val outDateFormat = new SimpleDateFormat(outputFormat) + partitionFilters.toArray.map { + _.transformDown { + case Literal(value, dataType) if dataType.isInstanceOf[StringType] => + val converted = outDateFormat.format(inDateFormat.parse(value.toString)) + Literal(UTF8String.fromString(converted), StringType) + } + } + } catch { + case NonFatal(e) => + logWarning("Fail to convert filters for TimestampBaseAvroKeyGenerator.") + partitionFilters + } + } + } else { + partitionFilters + } + } + private def getQueryPath(options: Map[String, String]) = { new Path(options.getOrElse("path", "'path' option required")) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 510102af7..d5b03f5be 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -41,6 +41,7 @@ import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRo import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.internal.DataSourceInternalWriterHelper +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner @@ -91,6 +92,9 @@ object HoodieSparkSqlWriter { validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) + val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) + val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator( + originKeyGeneratorClassName, parameters) val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim @@ -153,7 +157,8 @@ object HoodieSparkSqlWriter { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) - .setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters)) + .setKeyGeneratorClassProp(originKeyGeneratorClassName) + .set(timestampKeyGeneratorConfigs) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) @@ -757,4 +762,14 @@ object HoodieSparkSqlWriter { val params = mergedParams.toMap (params, HoodieWriterUtils.convertMapToHoodieConfig(params)) } + + private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String, + params: Map[String, String]): Map[String, String] = { + if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) || + keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) { + params.filterKeys(HoodieTableConfig.PERSISTED_CONFIG_LIST.contains) + } else { + Map.empty + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index c18ca8737..2e7dcf8d7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -187,10 +187,12 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p))) val partitionFilterExpression = HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) + val convertedPartitionFilterExpression = + HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq) // If convert success to catalyst expression, use the partition prune - val fileSlices = if (partitionFilterExpression.isDefined) { - hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get)) + val fileSlices = if (convertedPartitionFilterExpression.nonEmpty) { + hoodieFileIndex.listFileSlices(convertedPartitionFilterExpression) } else { hoodieFileIndex.listFileSlices(Seq.empty[Expression]) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 3c474f76e..fa07c573f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -18,6 +18,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration + import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.HoodieJavaWriteClient import org.apache.hudi.client.common.HoodieJavaEngineContext @@ -33,20 +34,25 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.ComplexKeyGenerator -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType} +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType +import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} import org.apache.spark.sql.execution.datasources.PartitionDirectory import org.apache.spark.sql.functions.{lit, struct} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession} + import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource} import java.util.Properties + + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 8e6c6b27e..8b7b3622b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,6 +18,7 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.FileSystem + import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -25,13 +26,15 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieUpsertException -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.keygen._ +import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} + import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ + import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail} @@ -41,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.sql.{Date, Timestamp} + import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -93,6 +97,62 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + /** + * This tests the case that query by with a specified partition condition on hudi table which is + * different between the value of the partition field and the actual partition path, + * like hudi table written by TimestampBasedKeyGenerator. + * + * For COW table, test the snapshot query mode and incremental query mode. + */ + @Test + def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = { + val options = commonOpts ++ Map( + "hoodie.compact.inline" -> "false", + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator", + Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING", + Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd", + Config.TIMESTAMP_TIMEZONE_FORMAT_PROP -> "GMT+8:00", + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd" + ) + + val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01")) + val records1 = recordsToStrings(dataGen1.generateInserts("001", 20)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp + + val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02")) + val records2 = recordsToStrings(dataGen2.generateInserts("002", 30)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp + + // snapshot query + val snapshotQueryRes = spark.read.format("hudi").load(basePath) + assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20) + assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30) + + // incremental query + val incrementalQueryRes = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit1Time) + .option(DataSourceReadOptions.END_INSTANTTIME.key, commit2Time) + .load(basePath) + assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) + assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 30) + } + /** * Test for https://issues.apache.org/jira/browse/HUDI-1615. Null Schema in BulkInsert row writer flow. * This was reported by customer when archival kicks in as the schema in commit metadata is not set for bulk_insert diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index 0d7d62618..c5c659a45 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -26,12 +26,14 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config +import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} + import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} + import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index f420b296e..7b51e5a62 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -18,6 +18,7 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.Path + import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} @@ -27,12 +28,16 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} + import org.apache.log4j.LogManager + import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.BooleanType + import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -770,4 +775,79 @@ class TestMORDataSource extends HoodieClientTestBase { .load(basePath + "/*/*/*/*") assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) } + + /** + * This tests the case that query by with a specified partition condition on hudi table which is + * different between the value of the partition field and the actual partition path, + * like hudi table written by TimestampBasedKeyGenerator. + * + * For MOR table, test all the three query modes. + */ + @Test + def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = { + val options = commonOpts ++ Map( + "hoodie.compact.inline" -> "false", + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator", + Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING", + Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd", + Config.TIMESTAMP_TIMEZONE_FORMAT_PROP -> "GMT+8:00", + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd" + ) + + val dataGen1 = new HoodieTestDataGenerator(Array("2022-01-01")) + val records1 = recordsToStrings(dataGen1.generateInserts("001", 50)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + val commit1Time = metaClient.getActiveTimeline.lastInstant().get().getTimestamp + + val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02")) + val records2 = recordsToStrings(dataGen2.generateInserts("002", 60)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val commit2Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp + + val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003", 20)).toList + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val commit3Time = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp + + // snapshot query + val snapshotQueryRes = spark.read.format("hudi").load(basePath) + assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit1Time'").count, 50) + assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit2Time'").count, 40) + assertEquals(snapshotQueryRes.where(s"_hoodie_commit_time = '$commit3Time'").count, 20) + + assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 50) + assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 60) + + // read_optimized query + val readOptimizedQueryRes = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50) + assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60) + + // incremental query + val incrementalQueryRes = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit2Time) + .option(DataSourceReadOptions.END_INSTANTTIME.key, commit3Time) + .load(basePath) + assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) + assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) + } }