From f7f5d4cc6db1230e653346953d87c6ff480c56eb Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Fri, 30 Jul 2021 17:24:00 +0800 Subject: [PATCH] [HUDI-2184] Support setting hive sync partition extractor class based on flink configuration (#3284) --- .../apache/hudi/table/HoodieTableFactory.java | 13 +++++ .../hudi/table/TestHoodieTableFactory.java | 52 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 753ced46a..a8fe93b12 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.AvroSchemaConverter; @@ -148,6 +149,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab setupHoodieKeyOptions(conf, table); // compaction options setupCompactionOptions(conf); + // hive options + setupHiveOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); } @@ -207,6 +210,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } } + /** + * Sets up the hive options from the table definition. + * */ + private static void setupHiveOptions(Configuration conf) { + if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING) + && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS)) { + conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS, MultiPartKeysValueExtractor.class.getName()); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index e40741ce1..799739cfc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -19,6 +19,8 @@ package org.apache.hudi.table; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; @@ -172,6 +174,31 @@ public class TestHoodieTableFactory { assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } + @Test + void testSetupHiveOptionsForSource() { + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); + final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName())); + + // set up hive style partitioning is true. + this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); + final Configuration conf2 = tableSource2.getConf(); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName())); + } + @Test void testSetupCleaningOptionsForSource() { // definition with simple primary key and partition path @@ -259,6 +286,31 @@ public class TestHoodieTableFactory { assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } + @Test + void testSetupHiveOptionsForSink() { + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + + final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1); + final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName())); + + // set up hive style partitioning is true. + this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + + final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2); + final Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName())); + } + @Test void testSetupCleaningOptionsForSink() { // definition with simple primary key and partition path