[HUDI-3977] Flink hudi table with date type partition path throws HoodieNotSupportedException (#5432)
This commit is contained in:
@@ -37,6 +37,7 @@ import java.io.IOException;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
|
import java.time.LocalDate;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@@ -175,6 +176,9 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
|
|||||||
timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue());
|
timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue());
|
||||||
} else if (partitionVal instanceof BigDecimal) {
|
} else if (partitionVal instanceof BigDecimal) {
|
||||||
timeMs = convertLongTimeToMillis(((BigDecimal) partitionVal).longValue());
|
timeMs = convertLongTimeToMillis(((BigDecimal) partitionVal).longValue());
|
||||||
|
} else if (partitionVal instanceof LocalDate) {
|
||||||
|
// Avro uses LocalDate to represent the Date value internal.
|
||||||
|
timeMs = convertLongTimeToMillis(((LocalDate) partitionVal).toEpochDay());
|
||||||
} else if (partitionVal instanceof CharSequence) {
|
} else if (partitionVal instanceof CharSequence) {
|
||||||
if (!inputFormatter.isPresent()) {
|
if (!inputFormatter.isPresent()) {
|
||||||
throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
|
throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
|
||||||
|
|||||||
@@ -367,13 +367,14 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
|
|
||||||
public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
|
public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
|
||||||
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
|
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
|
||||||
|
public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
|
||||||
public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
|
public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
|
||||||
.key("write.partition.format")
|
.key("write.partition.format")
|
||||||
.stringType()
|
.stringType()
|
||||||
.noDefaultValue()
|
.noDefaultValue()
|
||||||
.withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n"
|
.withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n"
|
||||||
+ "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n"
|
+ "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n"
|
||||||
+ "2) 'yyyyMMdd' for DAY and INT.");
|
+ "2) 'yyyyMMdd' for DATE and INT.");
|
||||||
|
|
||||||
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
|
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
|
||||||
.key("write.index_bootstrap.tasks")
|
.key("write.index_bootstrap.tasks")
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ import java.util.Arrays;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
|
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
|
||||||
|
|
||||||
@@ -243,6 +244,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
* <p>The UTC timezone is used as default.
|
* <p>The UTC timezone is used as default.
|
||||||
*/
|
*/
|
||||||
public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) {
|
public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) {
|
||||||
|
if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
|
||||||
|
// the keygen clazz has been set up explicitly, skipping
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
|
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
|
||||||
LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on",
|
LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on",
|
||||||
FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName());
|
FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName());
|
||||||
@@ -257,13 +263,17 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
|
||||||
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
|
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
|
||||||
}
|
}
|
||||||
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
|
String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
|
||||||
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat);
|
||||||
} else {
|
} else {
|
||||||
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
|
||||||
TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name());
|
TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
|
||||||
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
|
conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.DAYS.toString());
|
||||||
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
|
|
||||||
|
String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
|
||||||
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat);
|
||||||
|
// the option is actually useless, it only works for validation
|
||||||
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, FlinkOptions.PARTITION_FORMAT_DAY);
|
||||||
}
|
}
|
||||||
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC");
|
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,8 @@ import org.apache.flink.table.data.RowData;
|
|||||||
import org.apache.flink.table.data.StringData;
|
import org.apache.flink.table.data.StringData;
|
||||||
import org.apache.flink.table.data.TimestampData;
|
import org.apache.flink.table.data.TimestampData;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
import static org.apache.hudi.utils.TestData.insertRow;
|
import static org.apache.hudi.utils.TestData.insertRow;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
@@ -126,4 +128,40 @@ public class TestRowDataKeyGen {
|
|||||||
assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100"));
|
assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100"));
|
||||||
assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100"));
|
assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DASHED_DAY, FlinkOptions.PARTITION_FORMAT_DAY})
|
||||||
|
void testDateBasedKeyGenerator(String partitionFormat) {
|
||||||
|
boolean dashed = partitionFormat.equals(FlinkOptions.PARTITION_FORMAT_DASHED_DAY);
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf("path1", TestConfigurations.ROW_DATA_TYPE_DATE);
|
||||||
|
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "dt");
|
||||||
|
conf.setString(FlinkOptions.PARTITION_FORMAT, partitionFormat);
|
||||||
|
HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.DATE());
|
||||||
|
final RowData rowData1 = insertRow(TestConfigurations.ROW_TYPE_DATE,
|
||||||
|
StringData.fromString("id1"), StringData.fromString("Danny"), 23, 1);
|
||||||
|
final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE);
|
||||||
|
|
||||||
|
assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
|
||||||
|
String expectedPartition1 = dashed ? "1970-01-02" : "19700102";
|
||||||
|
assertThat(keyGen1.getPartitionPath(rowData1), is(expectedPartition1));
|
||||||
|
|
||||||
|
// null record key and partition path
|
||||||
|
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE_DATE, null, StringData.fromString("Danny"), 23, null);
|
||||||
|
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
|
||||||
|
String expectedPartition2 = dashed ? "1970-01-02" : "19700102";
|
||||||
|
assertThat(keyGen1.getPartitionPath(rowData2), is(expectedPartition2));
|
||||||
|
|
||||||
|
// empty record key
|
||||||
|
String expectedPartition3 = dashed ? "1970-01-03" : "19700103";
|
||||||
|
final RowData rowData3 = insertRow(TestConfigurations.ROW_TYPE_DATE, StringData.fromString(""), StringData.fromString("Danny"), 23, 2);
|
||||||
|
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
|
||||||
|
assertThat(keyGen1.getPartitionPath(rowData3), is(expectedPartition3));
|
||||||
|
|
||||||
|
// hive style partitioning
|
||||||
|
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
|
||||||
|
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE);
|
||||||
|
assertThat(keyGen2.getPartitionPath(rowData1), is("dt=" + expectedPartition1));
|
||||||
|
assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + expectedPartition2));
|
||||||
|
assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + expectedPartition3));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1028,6 +1028,37 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
|||||||
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
|
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
|
||||||
|
void testWriteAndReadWithDatePartitioning(String partitionFormat) {
|
||||||
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
|
String hoodieTableDDL = sql("t1")
|
||||||
|
.field("uuid varchar(20)")
|
||||||
|
.field("name varchar(10)")
|
||||||
|
.field("age int")
|
||||||
|
.field("ts date")
|
||||||
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
|
.option(FlinkOptions.PARTITION_FORMAT, partitionFormat)
|
||||||
|
.partitionField("ts") // use date as partition path field
|
||||||
|
.end();
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|
||||||
|
execInsertSql(tableEnv, TestSQL.INSERT_DATE_PARTITION_T1);
|
||||||
|
|
||||||
|
List<Row> result = CollectionUtil.iterableToList(
|
||||||
|
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
String expected = "["
|
||||||
|
+ "+I[id1, Danny, 23, 1970-01-01], "
|
||||||
|
+ "+I[id2, Stephen, 33, 1970-01-01], "
|
||||||
|
+ "+I[id3, Julian, 53, 1970-01-01], "
|
||||||
|
+ "+I[id4, Fabian, 31, 1970-01-01], "
|
||||||
|
+ "+I[id5, Sophia, 18, 1970-01-01], "
|
||||||
|
+ "+I[id6, Emma, 20, 1970-01-01], "
|
||||||
|
+ "+I[id7, Bob, 44, 1970-01-01], "
|
||||||
|
+ "+I[id8, Han, 56, 1970-01-01]]";
|
||||||
|
assertRowsEquals(result, expected);
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(strings = {"bulk_insert", "upsert"})
|
@ValueSource(strings = {"bulk_insert", "upsert"})
|
||||||
void testWriteReadDecimals(String operation) {
|
void testWriteReadDecimals(String operation) {
|
||||||
|
|||||||
@@ -419,7 +419,6 @@ public class TestHoodieTableFactory {
|
|||||||
@Test
|
@Test
|
||||||
void testSetupTimestampBasedKeyGenForSink() {
|
void testSetupTimestampBasedKeyGenForSink() {
|
||||||
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
|
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
|
||||||
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
|
|
||||||
// definition with simple primary key and partition path
|
// definition with simple primary key and partition path
|
||||||
ResolvedSchema schema1 = SchemaBuilder.instance()
|
ResolvedSchema schema1 = SchemaBuilder.instance()
|
||||||
.field("f0", DataTypes.INT().notNull())
|
.field("f0", DataTypes.INT().notNull())
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
|
|||||||
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||||
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
|
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
|
||||||
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
|
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
|
||||||
|
|
||||||
@@ -74,6 +75,15 @@ public class TestConfigurations {
|
|||||||
|
|
||||||
public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
|
public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
|
||||||
|
|
||||||
|
public static final DataType ROW_DATA_TYPE_DATE = DataTypes.ROW(
|
||||||
|
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
|
||||||
|
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
|
||||||
|
DataTypes.FIELD("age", DataTypes.INT()),
|
||||||
|
DataTypes.FIELD("dt", DataTypes.DATE()))
|
||||||
|
.notNull();
|
||||||
|
|
||||||
|
public static final RowType ROW_TYPE_DATE = (RowType) ROW_DATA_TYPE_DATE.getLogicalType();
|
||||||
|
|
||||||
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
|
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
|
||||||
return getCreateHoodieTableDDL(tableName, options, true, "partition");
|
return getCreateHoodieTableDDL(tableName, options, true, "partition");
|
||||||
}
|
}
|
||||||
@@ -212,6 +222,15 @@ public class TestConfigurations {
|
|||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Configuration getDefaultConf(String tablePath, DataType dataType) {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setString(FlinkOptions.PATH, tablePath);
|
||||||
|
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
|
||||||
|
conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
|
||||||
|
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
|
public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
|
||||||
FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
|
FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
|
||||||
streamerConf.targetBasePath = tablePath;
|
streamerConf.targetBasePath = tablePath;
|
||||||
|
|||||||
@@ -61,4 +61,14 @@ public class TestSQL {
|
|||||||
+ "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
|
+ "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
|
||||||
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
|
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
|
||||||
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";
|
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";
|
||||||
|
|
||||||
|
public static final String INSERT_DATE_PARTITION_T1 = "insert into t1 values\n"
|
||||||
|
+ "('id1','Danny',23,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id2','Stephen',33,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id3','Julian',53,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id4','Fabian',31,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id5','Sophia',18,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id6','Emma',20,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id7','Bob',44,DATE '1970-01-01'),\n"
|
||||||
|
+ "('id8','Han',56,DATE '1970-01-01')";
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user