From ac3589f00659985c39ef29e5edd089279f6c2f70 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 21 Apr 2021 20:07:27 +0800 Subject: [PATCH] [HUDI-1814] Non partitioned table for Flink writer (#2859) --- .../hudi/configuration/FlinkOptions.java | 4 +-- .../apache/hudi/table/HoodieTableFactory.java | 17 ++++++--- .../hudi/table/HoodieDataSourceITCase.java | 36 +++++++++++++++++++ .../hudi/table/TestHoodieTableFactory.java | 17 +++++++++ 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c47ea9584..9925fc7bb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -239,9 +239,9 @@ public class FlinkOptions { public static final ConfigOption PARTITION_PATH_FIELD = ConfigOptions .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY) .stringType() - .defaultValue("partition-path") + .defaultValue("") .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n" - + "Actual value obtained by invoking .toString()"); + + "Actual value obtained by invoking .toString(), default ''"); public static final ConfigOption PARTITION_PATH_URL_ENCODE = ConfigOptions .key("write.partition.url_encode") diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 22dcd3e3a..abdfdcbb3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.flink.configuration.ConfigOption; @@ -129,13 +130,21 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab String recordKey = String.join(",", pkColumns); conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey); } - List partitions = table.getPartitionKeys(); - if (partitions.size() > 0) { + List partitionKeys = table.getPartitionKeys(); + if (partitionKeys.size() > 0) { // the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitions)); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys)); } // tweak the key gen class if possible - boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1; + final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); + if (partitions.length == 1 && partitions[0].equals("")) { + conf.setString(FlinkOptions.KEYGEN_CLASS, NonpartitionedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", + FlinkOptions.KEYGEN_CLASS.key(), NonpartitionedAvroKeyGenerator.class.getName()); + return; + } + final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); + boolean complexHoodieKey = pks.length > 1 || partitions.length > 1; if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS)) { conf.setString(FlinkOptions.KEYGEN_CLASS, ComplexAvroKeyGenerator.class.getName()); LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 29a7d7d0a..fe652c5ff 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -352,6 +352,42 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]"); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testWriteNonPartitionedTable(ExecMode execMode) { + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + String hoodieTableDDL = "create table t1(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '" + tempFile.getAbsolutePath() + "'\n" + + ")"; + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + + execInsertSql(tableEnv, insertInto1); + + final String insertInto2 = "insert into t1 values\n" + + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n" + + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n" + + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n" + + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')"; + + execInsertSql(tableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index bbb59640f..1f2059e2c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -119,6 +120,14 @@ public class TestHoodieTableFactory { final Configuration conf2 = tableSource2.getConf(); assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); + + // definition with complex primary keys and empty partition paths + this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); + final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, ""); + final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3); + final Configuration conf3 = tableSource3.getConf(); + assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } @Test @@ -167,6 +176,14 @@ public class TestHoodieTableFactory { final Configuration conf2 = tableSink2.getConf(); assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName())); + + // definition with complex primary keys and empty partition paths + this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue()); + final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, ""); + final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3); + final Configuration conf3 = tableSink3.getConf(); + assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1")); + assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } // -------------------------------------------------------------------------