1
0

[HUDI-2412] Add timestamp based partitioning for flink writer (#3638)

This commit is contained in:
Danny Chan
2021-09-11 13:17:16 +08:00
committed by GitHub
parent 06240417e9
commit b30c5bdaef
12 changed files with 355 additions and 65 deletions

View File

@@ -321,6 +321,16 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
.key("write.partition.format")
.stringType()
.noDefaultValue()
.withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n"
+ "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n"
+ "2) 'yyyyMMdd' for DAY and INT.");
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
.intType()
@@ -364,10 +374,10 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(128)
.withDescription("Max log block size in MB for log file, default 128MB");
public static final ConfigOption<Integer> WRITE_LOG_MAX_SIZE = ConfigOptions
public static final ConfigOption<Long> WRITE_LOG_MAX_SIZE = ConfigOptions
.key("write.log.max.size")
.intType()
.defaultValue(1024)
.longType()
.defaultValue(1024L)
.withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions

View File

@@ -18,17 +18,22 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -58,6 +63,8 @@ public class RowDataKeyGen implements Serializable {
private final boolean hiveStylePartitioning;
private final boolean encodePartitionPath;
private final Option<TimestampBasedAvroKeyGenerator> keyGenOpt;
// efficient code path
private boolean simpleRecordKey = false;
private RowData.FieldGetter recordKeyFieldGetter;
@@ -72,7 +79,8 @@ public class RowDataKeyGen implements Serializable {
String partitionFields,
RowType rowType,
boolean hiveStylePartitioning,
boolean encodePartitionPath) {
boolean encodePartitionPath,
Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
this.recordKeyFields = recordKeys.split(",");
this.partitionPathFields = partitionFields.split(",");
List<String> fieldNames = rowType.getFieldNames();
@@ -102,11 +110,21 @@ public class RowDataKeyGen implements Serializable {
} else {
this.partitionPathProjection = getProjection(this.partitionPathFields, fieldNames, fieldTypes);
}
this.keyGenOpt = keyGenOpt;
}
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
if (conf.getString(FlinkOptions.KEYGEN_CLASS_NAME).equals(TimestampBasedAvroKeyGenerator.class.getName())) {
try {
keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
} catch (IOException e) {
throw new HoodieKeyException("Initialize TimestampBasedAvroKeyGenerator error", e);
}
}
return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD), conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING));
rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING),
keyGeneratorOpt);
}
public String getRecordKey(RowData rowData) {
@@ -121,7 +139,7 @@ public class RowDataKeyGen implements Serializable {
public String getPartitionPath(RowData rowData) {
if (this.simplePartitionPath) {
return getPartitionPath(partitionPathFieldGetter.getFieldOrNull(rowData),
this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath);
this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath, this.keyGenOpt);
} else if (this.nonPartitioned) {
return EMPTY_PARTITION;
} else {
@@ -193,7 +211,12 @@ public class RowDataKeyGen implements Serializable {
Object partValue,
String partField,
boolean hiveStylePartitioning,
boolean encodePartitionPath) {
boolean encodePartitionPath,
Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
if (keyGenOpt.isPresent()) {
TimestampBasedAvroKeyGenerator keyGenerator = keyGenOpt.get();
return keyGenerator.getPartitionPath(toEpochMilli(partValue, keyGenerator));
}
String partitionPath = StringUtils.objToString(partValue);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = DEFAULT_PARTITION_PATH;
@@ -207,6 +230,17 @@ public class RowDataKeyGen implements Serializable {
return partitionPath;
}
private static Object toEpochMilli(Object val, TimestampBasedAvroKeyGenerator keyGenerator) {
if (val instanceof TimestampData) {
return ((TimestampData) val).toInstant().toEpochMilli();
}
if (val == null) {
// should match the default partition path when STRING partition path re-format is supported
return keyGenerator.getDefaultPartitionVal();
}
return val;
}
/**
* Returns the row data projection for the given field names and table schema.
*

View File

@@ -109,12 +109,26 @@ public class FileIndex {
* Returns all the file statuses under the table base path.
*/
public FileStatus[] getFilesInPartitions() {
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new Path(path, p).toString()).toArray(String[]::new);
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
}
/**
* Returns the full partition path.
*
* @param basePath The base path.
* @param partitionPath The relative partition path, may be empty if the table is non-partitioned.
* @return The full partition path string
*/
private static String fullPartitionPath(Path basePath, String partitionPath) {
if (partitionPath.isEmpty()) {
return basePath.toString();
}
return new Path(basePath, partitionPath).toString();
}
/**
* Reset the state of the file index.
*/

View File

@@ -339,7 +339,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize);
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize);
conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize);
conf.setInteger(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize);
conf.setLong(FlinkOptions.WRITE_LOG_MAX_SIZE, config.writeLogMaxSize);
conf.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, config.writeMergeMaxMemory);
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, config.compactionAsyncEnabled);
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);

View File

@@ -19,10 +19,13 @@
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption;
@@ -36,6 +39,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -173,13 +177,23 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
}
// tweak the key gen class if possible
final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
if (partitions.length == 1 && partitions[0].equals("")) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table",
FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
return;
}
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
if (partitions.length == 1) {
final String partitionField = partitions[0];
if (partitionField.isEmpty()) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table",
FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName());
return;
}
DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField)
.orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist"));
if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) {
// timestamp based key gen only supports simple primary key
setupTimestampKeygenOptions(conf, partitionFieldType);
return;
}
}
boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName());
@@ -188,6 +202,37 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
}
}
/**
* Sets up the keygen options when the partition path is datetime type.
*
* <p>The UTC timezone is used as default.
*/
public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on",
FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName());
if (DataTypeUtils.isTimestampType(fieldType)) {
int precision = DataTypeUtils.precision(fieldType.getLogicalType());
if (precision == 0) {
// seconds
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP,
TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name());
} else if (precision == 3) {
// milliseconds
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP,
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
}
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
} else {
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP,
TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name());
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
}
conf.setString(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC");
}
/**
* Sets up the compaction options from the table definition.
*/

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.TimestampType;
/**
* Utilities for {@link org.apache.flink.table.types.DataType}.
*/
public class DataTypeUtils {
/**
* Returns whether the given type is TIMESTAMP type.
*/
public static boolean isTimestampType(DataType type) {
return type.getLogicalType().getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
}
/**
* Returns the precision of the given TIMESTAMP type.
*/
public static int precision(LogicalType logicalType) {
ValidationUtils.checkArgument(logicalType instanceof TimestampType);
TimestampType timestampType = (TimestampType) logicalType;
return timestampType.getPrecision();
}
/**
* Returns whether the given type is DATE type.
*/
public static boolean isDateType(DataType type) {
return type.getLogicalType().getTypeRoot() == LogicalTypeRoot.DATE;
}
/**
* Returns whether the given type is DATETIME type.
*/
public static boolean isDatetimeType(DataType type) {
return isTimestampType(type) || isDateType(type);
}
}

View File

@@ -143,7 +143,7 @@ public class StreamerUtil {
return conf;
}
// Keep to avoid to much modifications.
// Keep the redundant to avoid too many modifications.
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
return FlinkClientUtil.getHadoopConf();
}
@@ -180,7 +180,7 @@ public class StreamerUtil {
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withStorageConfig(HoodieStorageConfig.newBuilder()
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
.logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
.parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024)
.parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024)
.parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L)
@@ -192,7 +192,7 @@ public class StreamerUtil {
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
.withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
.withProps(flinkConf2TypedProperties(conf));
builder = builder.withSchema(getSourceSchema(conf).toString());
return builder.build();
@@ -206,12 +206,13 @@ public class StreamerUtil {
* @return a TypedProperties instance
*/
public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
Configuration flatConf = FlinkOptions.flatOptions(conf);
Properties properties = new Properties();
// put all the set up options
conf.addAllToProperties(properties);
// put all the set options
flatConf.addAllToProperties(properties);
// put all the default options
for (ConfigOption<?> option : FlinkOptions.optionalOptions()) {
if (!conf.contains(option) && option.hasDefaultValue()) {
if (!flatConf.contains(option) && option.hasDefaultValue()) {
properties.put(option.key(), option.defaultValue());
}
}