[HUDI-2184] Support setting hive sync partition extractor class based on flink configuration (#3284)
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
@@ -148,6 +149,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
setupHoodieKeyOptions(conf, table);
|
||||
// compaction options
|
||||
setupCompactionOptions(conf);
|
||||
// hive options
|
||||
setupHiveOptions(conf);
|
||||
// infer avro schema from physical DDL schema
|
||||
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
|
||||
}
|
||||
@@ -207,6 +210,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the hive options from the table definition.
|
||||
* */
|
||||
private static void setupHiveOptions(Configuration conf) {
|
||||
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
|
||||
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS)) {
|
||||
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS, MultiPartKeysValueExtractor.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
|
||||
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
|
||||
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
@@ -172,6 +174,31 @@ public class TestHoodieTableFactory {
|
||||
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupHiveOptionsForSource() {
|
||||
// definition with simple primary key and partition path
|
||||
TableSchema schema1 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.field("ts", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
|
||||
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
|
||||
final Configuration conf1 = tableSource1.getConf();
|
||||
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName()));
|
||||
|
||||
// set up hive style partitioning is true.
|
||||
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
|
||||
|
||||
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
|
||||
final Configuration conf2 = tableSource2.getConf();
|
||||
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupCleaningOptionsForSource() {
|
||||
// definition with simple primary key and partition path
|
||||
@@ -259,6 +286,31 @@ public class TestHoodieTableFactory {
|
||||
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupHiveOptionsForSink() {
|
||||
// definition with simple primary key and partition path
|
||||
TableSchema schema1 = TableSchema.builder()
|
||||
.field("f0", DataTypes.INT().notNull())
|
||||
.field("f1", DataTypes.VARCHAR(20))
|
||||
.field("f2", DataTypes.TIMESTAMP(3))
|
||||
.field("ts", DataTypes.TIMESTAMP(3))
|
||||
.primaryKey("f0")
|
||||
.build();
|
||||
|
||||
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
|
||||
final Configuration conf1 = tableSink1.getConf();
|
||||
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(MultiPartKeysValueExtractor.class.getName()));
|
||||
|
||||
// set up hive style partitioning is true.
|
||||
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
|
||||
|
||||
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
|
||||
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
|
||||
final Configuration conf2 = tableSink2.getConf();
|
||||
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSetupCleaningOptionsForSink() {
|
||||
// definition with simple primary key and partition path
|
||||
|
||||
Reference in New Issue
Block a user