From b30c5bdaef77aee9f564ac24f80f5c364014bb17 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 11 Sep 2021 13:17:16 +0800 Subject: [PATCH] [HUDI-2412] Add timestamp based partitioning for flink writer (#3638) --- .../hudi/configuration/FlinkOptions.java | 16 ++- .../apache/hudi/sink/bulk/RowDataKeyGen.java | 42 ++++++- .../org/apache/hudi/source/FileIndex.java | 16 ++- .../hudi/streamer/FlinkStreamerConfig.java | 2 +- .../apache/hudi/table/HoodieTableFactory.java | 57 +++++++++- .../org/apache/hudi/util/DataTypeUtils.java | 61 ++++++++++ .../org/apache/hudi/util/StreamerUtil.java | 13 ++- .../hudi/sink/bulk/TestRowDataKeyGen.java | 33 ++++++ .../org/apache/hudi/source/TestFileIndex.java | 19 ++++ .../hudi/table/HoodieDataSourceITCase.java | 104 ++++++++++++------ .../hudi/table/TestHoodieTableFactory.java | 30 ++++- .../apache/hudi/utils/TestConfigurations.java | 27 +++-- 12 files changed, 355 insertions(+), 65 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 64b308d6f..bdb58e424 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -321,6 +321,16 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(KeyGeneratorType.SIMPLE.name()) .withDescription("Key generator type, that implements will extract the key out of incoming record"); + public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH"; + public static final String PARTITION_FORMAT_DAY = "yyyyMMdd"; + public static final ConfigOption PARTITION_FORMAT = ConfigOptions + .key("write.partition.format") + .stringType() + .noDefaultValue() + .withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n" + + "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n" + + "2) 'yyyyMMdd' for DAY and INT."); + public static final ConfigOption INDEX_BOOTSTRAP_TASKS = ConfigOptions .key("write.index_bootstrap.tasks") .intType() @@ -364,10 +374,10 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(128) .withDescription("Max log block size in MB for log file, default 128MB"); - public static final ConfigOption WRITE_LOG_MAX_SIZE = ConfigOptions + public static final ConfigOption WRITE_LOG_MAX_SIZE = ConfigOptions .key("write.log.max.size") - .intType() - .defaultValue(1024) + .longType() + .defaultValue(1024L) .withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB"); public static final ConfigOption WRITE_PARQUET_BLOCK_SIZE = ConfigOptions diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index 5b73f91d0..b6fecff20 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -18,17 +18,22 @@ package org.apache.hudi.sink.bulk; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.util.RowDataProjection; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -58,6 +63,8 @@ public class RowDataKeyGen implements Serializable { private final boolean hiveStylePartitioning; private final boolean encodePartitionPath; + private final Option keyGenOpt; + // efficient code path private boolean simpleRecordKey = false; private RowData.FieldGetter recordKeyFieldGetter; @@ -72,7 +79,8 @@ public class RowDataKeyGen implements Serializable { String partitionFields, RowType rowType, boolean hiveStylePartitioning, - boolean encodePartitionPath) { + boolean encodePartitionPath, + Option keyGenOpt) { this.recordKeyFields = recordKeys.split(","); this.partitionPathFields = partitionFields.split(","); List fieldNames = rowType.getFieldNames(); @@ -102,11 +110,21 @@ public class RowDataKeyGen implements Serializable { } else { this.partitionPathProjection = getProjection(this.partitionPathFields, fieldNames, fieldTypes); } + this.keyGenOpt = keyGenOpt; } public static RowDataKeyGen instance(Configuration conf, RowType rowType) { + Option keyGeneratorOpt = Option.empty(); + if (conf.getString(FlinkOptions.KEYGEN_CLASS_NAME).equals(TimestampBasedAvroKeyGenerator.class.getName())) { + try { + keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf))); + } catch (IOException e) { + throw new HoodieKeyException("Initialize TimestampBasedAvroKeyGenerator error", e); + } + } return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD), conf.getString(FlinkOptions.PARTITION_PATH_FIELD), - rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)); + rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING), + keyGeneratorOpt); } public String getRecordKey(RowData rowData) { @@ -121,7 +139,7 @@ public class RowDataKeyGen implements Serializable { public String getPartitionPath(RowData rowData) { if (this.simplePartitionPath) { return getPartitionPath(partitionPathFieldGetter.getFieldOrNull(rowData), - this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath); + this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath, this.keyGenOpt); } else if (this.nonPartitioned) { return EMPTY_PARTITION; } else { @@ -193,7 +211,12 @@ public class RowDataKeyGen implements Serializable { Object partValue, String partField, boolean hiveStylePartitioning, - boolean encodePartitionPath) { + boolean encodePartitionPath, + Option keyGenOpt) { + if (keyGenOpt.isPresent()) { + TimestampBasedAvroKeyGenerator keyGenerator = keyGenOpt.get(); + return keyGenerator.getPartitionPath(toEpochMilli(partValue, keyGenerator)); + } String partitionPath = StringUtils.objToString(partValue); if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = DEFAULT_PARTITION_PATH; @@ -207,6 +230,17 @@ public class RowDataKeyGen implements Serializable { return partitionPath; } + private static Object toEpochMilli(Object val, TimestampBasedAvroKeyGenerator keyGenerator) { + if (val instanceof TimestampData) { + return ((TimestampData) val).toInstant().toEpochMilli(); + } + if (val == null) { + // should match the default partition path when STRING partition path re-format is supported + return keyGenerator.getDefaultPartitionVal(); + } + return val; + } + /** * Returns the row data projection for the given field names and table schema. * diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index bf37f1204..be02fc404 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -109,12 +109,26 @@ public class FileIndex { * Returns all the file statuses under the table base path. */ public FileStatus[] getFilesInPartitions() { - String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new Path(path, p).toString()).toArray(String[]::new); + String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), partitions, "/tmp/") .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); } + /** + * Returns the full partition path. + * + * @param basePath The base path. + * @param partitionPath The relative partition path, may be empty if the table is non-partitioned. + * @return The full partition path string + */ + private static String fullPartitionPath(Path basePath, String partitionPath) { + if (partitionPath.isEmpty()) { + return basePath.toString(); + } + return new Path(basePath, partitionPath).toString(); + } + /** * Reset the state of the file index. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3dc8cb27a..898ba88fd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -339,7 +339,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize); conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize); conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize); - conf.setInteger(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize); + conf.setLong(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize); conf.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, config.writeMergeMaxMemory); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, config.compactionAsyncEnabled); conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 034485746..cf1cbd58f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,10 +19,13 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.ConfigOption; @@ -36,6 +39,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -173,13 +177,23 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } // tweak the key gen class if possible final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); - if (partitions.length == 1 && partitions[0].equals("")) { - conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); - LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", - FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName()); - return; - } final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); + if (partitions.length == 1) { + final String partitionField = partitions[0]; + if (partitionField.isEmpty()) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", + FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName()); + return; + } + DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField) + .orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist")); + if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) { + // timestamp based key gen only supports simple primary key + setupTimestampKeygenOptions(conf, partitionFieldType); + return; + } + } boolean complexHoodieKey = pks.length > 1 || partitions.length > 1; if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) { conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName()); @@ -188,6 +202,37 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } } + /** + * Sets up the keygen options when the partition path is datetime type. + * + *

The UTC timezone is used as default. + */ + public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on", + FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName()); + if (DataTypeUtils.isTimestampType(fieldType)) { + int precision = DataTypeUtils.precision(fieldType.getLogicalType()); + if (precision == 0) { + // seconds + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name()); + } else if (precision == 3) { + // milliseconds + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name()); + } + String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR); + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + } else { + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name()); + String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY); + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + } + conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC"); + } + /** * Sets up the compaction options from the table definition. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java new file mode 100644 index 000000000..eb188ad55 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampType; + +/** + * Utilities for {@link org.apache.flink.table.types.DataType}. + */ +public class DataTypeUtils { + /** + * Returns whether the given type is TIMESTAMP type. + */ + public static boolean isTimestampType(DataType type) { + return type.getLogicalType().getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; + } + + /** + * Returns the precision of the given TIMESTAMP type. + */ + public static int precision(LogicalType logicalType) { + ValidationUtils.checkArgument(logicalType instanceof TimestampType); + TimestampType timestampType = (TimestampType) logicalType; + return timestampType.getPrecision(); + } + + /** + * Returns whether the given type is DATE type. + */ + public static boolean isDateType(DataType type) { + return type.getLogicalType().getTypeRoot() == LogicalTypeRoot.DATE; + } + + /** + * Returns whether the given type is DATETIME type. + */ + public static boolean isDatetimeType(DataType type) { + return isTimestampType(type) || isDateType(type); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b95c9e1d3..04eeab8b3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -143,7 +143,7 @@ public class StreamerUtil { return conf; } - // Keep to avoid to much modifications. + // Keep the redundant to avoid too many modifications. public static org.apache.hadoop.conf.Configuration getHadoopConf() { return FlinkClientUtil.getHadoopConf(); } @@ -180,7 +180,7 @@ public class StreamerUtil { .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withStorageConfig(HoodieStorageConfig.newBuilder() .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) - .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) + .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024) .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024) .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) @@ -192,7 +192,7 @@ public class StreamerUtil { .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) - .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); + .withProps(flinkConf2TypedProperties(conf)); builder = builder.withSchema(getSourceSchema(conf).toString()); return builder.build(); @@ -206,12 +206,13 @@ public class StreamerUtil { * @return a TypedProperties instance */ public static TypedProperties flinkConf2TypedProperties(Configuration conf) { + Configuration flatConf = FlinkOptions.flatOptions(conf); Properties properties = new Properties(); - // put all the set up options - conf.addAllToProperties(properties); + // put all the set options + flatConf.addAllToProperties(properties); // put all the default options for (ConfigOption option : FlinkOptions.optionalOptions()) { - if (!conf.contains(option) && option.hasDefaultValue()) { + if (!flatConf.contains(option) && option.hasDefaultValue()) { properties.put(option.key(), option.defaultValue()); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java index 7a1eaeecf..822df063b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -20,9 +20,11 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.table.HoodieTableFactory; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -93,4 +95,35 @@ public class TestRowDataKeyGen { assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default/ts=default")); assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default/ts=1970-01-01T00:00:00.001")); } + + @Test + void testTimestampBasedKeyGenerator() { + Configuration conf = TestConfigurations.getDefaultConf("path1"); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "ts"); + HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.TIMESTAMP(3)); + final RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(7200000), StringData.fromString("par1")); + final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + + assertThat(keyGen1.getRecordKey(rowData1), is("id1")); + assertThat(keyGen1.getPartitionPath(rowData1), is("1970010102")); + + // null record key and partition path + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23, + null, StringData.fromString("par1")); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); + assertThat(keyGen1.getPartitionPath(rowData2), is("1970010100")); + // empty record key and partition path + final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3)); + assertThat(keyGen1.getPartitionPath(rowData3), is("1970010100")); + + // hive style partitioning + conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen2.getPartitionPath(rowData1), is("ts=1970010102")); + assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100")); + assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100")); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index b1f442fa6..f229f2de8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -20,12 +20,14 @@ package org.apache.hudi.source; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -69,4 +71,21 @@ public class TestFileIndex { assertTrue(Arrays.stream(fileStatuses) .allMatch(fileStatus -> fileStatus.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))); } + + @Test + void testFileListingUsingMetadataNonPartitionedTable() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, ""); + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + List partitionKeys = Collections.singletonList(""); + List> partitions = fileIndex.getPartitions(partitionKeys, "default", false); + assertThat(partitions.size(), is(0)); + + FileStatus[] fileStatuses = fileIndex.getFilesInPartitions(); + assertThat(fileStatuses.length, is(1)); + assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index a04e7bb99..5be603f78 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -100,8 +100,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -111,8 +111,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { streamTableEnv.executeSql("drop table t1"); hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) .option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -141,14 +141,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); - // reading from latest commit instance. + // reading from the latest commit instance. List rows = execSelectSql(streamTableEnv, "select * from t1", 10); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); @@ -169,8 +169,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String createHoodieTable = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) .end(); streamTableEnv.executeSql(createHoodieTable); String insertInto = "insert into t1 select * from source"; @@ -184,8 +184,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { // now we consume starting from the oldest commit String createHoodieTable2 = sql("t2") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.TABLE_TYPE, tableType.name()) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.TABLE_TYPE, tableType) .option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit) .end(); streamTableEnv.executeSql(createHoodieTable2); @@ -226,8 +226,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { // test MOR streaming write with compaction then reads as // query type 'read_optimized'. .option(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED) - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1") - .option(FlinkOptions.COMPACTION_TASKS, "1") + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.COMPACTION_TASKS, 1) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -250,7 +250,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.CLEAN_RETAIN_COMMITS, "1") + .option(FlinkOptions.CLEAN_RETAIN_COMMITS, 1) .end(); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -287,10 +287,10 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit) - .option(FlinkOptions.CHANGELOG_ENABLED, "true") + .option(FlinkOptions.CHANGELOG_ENABLED, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -319,9 +319,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.TABLE_TYPE, tableType.name()) - .option(FlinkOptions.READ_AS_STREAMING, "true") - .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.READ_AS_STREAMING, true) + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -342,13 +342,13 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) - .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST) - .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2) // close the async compaction .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false) // generate compaction plan for each commit - .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1") + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) .withPartition(false) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -422,7 +422,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.TABLE_NAME, tableType.name()) + .option(FlinkOptions.TABLE_NAME, tableType) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .end(); tableEnv.executeSql(hoodieTableDDL); @@ -557,11 +557,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testWriteNonPartitionedTable(ExecMode execMode) { + @MethodSource("executionModeAndTableTypeParams") + void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) .withPartition(false) .end(); tableEnv.executeSql(hoodieTableDDL); @@ -593,8 +594,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "true") - .option(FlinkOptions.INSERT_DROP_DUPS, "true") + .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true) + .option(FlinkOptions.INSERT_DROP_DUPS, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -615,8 +616,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { streamTableEnv.executeSql(createSource); String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.INDEX_GLOBAL_ENABLED, "false") - .option(FlinkOptions.INSERT_DROP_DUPS, "true") + .option(FlinkOptions.INDEX_GLOBAL_ENABLED, false) + .option(FlinkOptions.INSERT_DROP_DUPS, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -743,7 +744,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("hoodie_sink") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "bulk_insert") - .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, "true") + .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .end(); tableEnv.executeSql(hoodieTableDDL); @@ -825,6 +826,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) { + // can not read the hive style and timestamp based partitioning table + // in batch mode, the code path in CopyOnWriteInputFormat relies on + // the value on the partition path to recover the partition value, + // but the date format has changed(milliseconds switch to hours). + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .partitionField("ts") // use timestamp as partition path field + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -832,6 +861,19 @@ public class HoodieDataSourceITCase extends AbstractTestBase { BATCH, STREAM } + /** + * Return test params => (execution mode, table type). + */ + private static Stream executionModeAndTableTypeParams() { + Object[][] data = + new Object[][] { + {ExecMode.BATCH, HoodieTableType.MERGE_ON_READ}, + {ExecMode.BATCH, HoodieTableType.COPY_ON_WRITE}, + {ExecMode.STREAM, HoodieTableType.MERGE_ON_READ}, + {ExecMode.STREAM, HoodieTableType.COPY_ON_WRITE}}; + return Stream.of(data).map(Arguments::of); + } + /** * Return test params => (execution mode, hive style partitioning). */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 6e4b21598..1572dd446 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -23,6 +23,7 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.SchemaBuilder; import org.apache.hudi.utils.TestConfigurations; @@ -142,7 +143,7 @@ public class TestHoodieTableFactory { ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) - .field("f2", DataTypes.TIMESTAMP(3)) + .field("f2", DataTypes.BIGINT()) .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); @@ -254,7 +255,7 @@ public class TestHoodieTableFactory { ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) - .field("f2", DataTypes.TIMESTAMP(3)) + .field("f2", DataTypes.BIGINT()) .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); @@ -342,6 +343,31 @@ public class TestHoodieTableFactory { assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); } + @Test + void testSetupTimestampBasedKeyGenForSink() { + this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); + this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass"); + // definition with simple primary key and partition path + ResolvedSchema schema1 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "ts"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); + final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0")); + assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS_NAME), is(TimestampBasedAvroKeyGenerator.class.getName())); + assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, "dummy"), + is("EPOCHMILLISECONDS")); + assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "dummy"), + is(FlinkOptions.PARTITION_FORMAT_HOUR)); + assertThat(conf1.getString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "dummy"), + is("UTC")); + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 231d42c26..b66d55a77 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -58,10 +58,14 @@ public class TestConfigurations { .build(); public static String getCreateHoodieTableDDL(String tableName, Map options) { - return getCreateHoodieTableDDL(tableName, options, true); + return getCreateHoodieTableDDL(tableName, options, true, "partition"); } - public static String getCreateHoodieTableDDL(String tableName, Map options, boolean havePartition) { + public static String getCreateHoodieTableDDL( + String tableName, + Map options, + boolean havePartition, + String partitionField) { StringBuilder builder = new StringBuilder(); builder.append("create table " + tableName + "(\n" + " uuid varchar(20),\n" @@ -72,7 +76,7 @@ public class TestConfigurations { + " PRIMARY KEY(uuid) NOT ENFORCED\n" + ")\n"); if (havePartition) { - builder.append("PARTITIONED BY (`partition`)\n"); + builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n"); } builder.append("with (\n" + " 'connector' = 'hudi'"); @@ -203,19 +207,15 @@ public class TestConfigurations { private final Map options; private String tableName; private boolean withPartition = true; + private String partitionField = "partition"; public Sql(String tableName) { options = new HashMap<>(); this.tableName = tableName; } - public Sql option(ConfigOption option, String val) { - this.options.put(option.key(), val); - return this; - } - - public Sql option(ConfigOption option, boolean val) { - this.options.put(option.key(), val + ""); + public Sql option(ConfigOption option, Object val) { + this.options.put(option.key(), val.toString()); return this; } @@ -224,8 +224,13 @@ public class TestConfigurations { return this; } + public Sql partitionField(String partitionField) { + this.partitionField = partitionField; + return this; + } + public String end() { - return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition); + return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition, this.partitionField); } } }