1
0

[HUDI-1814] Non partitioned table for Flink writer (#2859)

This commit is contained in:
Danny Chan
2021-04-21 20:07:27 +08:00
committed by GitHub
parent aacb8be521
commit ac3589f006
4 changed files with 68 additions and 6 deletions

View File

@@ -239,9 +239,9 @@ public class FlinkOptions {
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.stringType()
.defaultValue("partition-path")
.defaultValue("")
.withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+ "Actual value obtained by invoking .toString()");
+ "Actual value obtained by invoking .toString(), default ''");
public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE = ConfigOptions
.key("write.partition.url_encode")

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.flink.configuration.ConfigOption;
@@ -129,13 +130,21 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
String recordKey = String.join(",", pkColumns);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
}
List<String> partitions = table.getPartitionKeys();
if (partitions.size() > 0) {
List<String> partitionKeys = table.getPartitionKeys();
if (partitionKeys.size() > 0) {
// the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitions));
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys));
}
// tweak the key gen class if possible
boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
if (partitions.length == 1 && partitions[0].equals("")) {
conf.setString(FlinkOptions.KEYGEN_CLASS, NonpartitionedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table",
FlinkOptions.KEYGEN_CLASS.key(), NonpartitionedAvroKeyGenerator.class.getName());
return;
}
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS)) {
conf.setString(FlinkOptions.KEYGEN_CLASS, ComplexAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields",

View File

@@ -352,6 +352,42 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]");
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteNonPartitionedTable(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
String hoodieTableDDL = "create table t1(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
+ " age int,\n"
+ " ts timestamp(3),\n"
+ " `partition` varchar(20),\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ ")\n"
+ "with (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '" + tempFile.getAbsolutePath() + "'\n"
+ ")";
tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
execInsertSql(tableEnv, insertInto1);
final String insertInto2 = "insert into t1 values\n"
+ "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n"
+ "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n"
+ "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n"
+ "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')";
execInsertSql(tableEnv, insertInto2);
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -119,6 +120,14 @@ public class TestHoodieTableFactory {
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
// definition with complex primary keys and empty partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema2, "");
final HoodieTableSource tableSource3 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext3);
final Configuration conf3 = tableSource3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
@@ -167,6 +176,14 @@ public class TestHoodieTableFactory {
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), is(ComplexAvroKeyGenerator.class.getName()));
// definition with complex primary keys and empty partition paths
this.conf.setString(FlinkOptions.KEYGEN_CLASS, FlinkOptions.KEYGEN_CLASS.defaultValue());
final MockContext sinkContext3 = MockContext.getInstance(this.conf, schema2, "");
final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext3);
final Configuration conf3 = tableSink3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
// -------------------------------------------------------------------------