From 55dca969f9d25ea352dc0e343ef1f1e8539630fd Mon Sep 17 00:00:00 2001 From: todd5167 <313328862@qq.com> Date: Wed, 16 Mar 2022 03:56:37 +0800 Subject: [PATCH] [HUDI-3589] flink sync hive metadata supports table properties and serde properties (#4995) --- .../org/apache/hudi/configuration/FlinkOptions.java | 12 ++++++++++++ .../org/apache/hudi/sink/utils/HiveSyncContext.java | 2 ++ 2 files changed, 14 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 34bab1285..90ed73a3d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -675,6 +675,18 @@ public class FlinkOptions extends HoodieConfig { .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" + "Disabled by default for backward compatibility."); + public static final ConfigOption HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions + .key("hive_sync.table_properties") + .stringType() + .noDefaultValue() + .withDescription("Additional properties to store with table, the data format is k1=v1\nk2=v2"); + + public static final ConfigOption HIVE_SYNC_TABLE_SERDE_PROPERTIES = ConfigOptions + .key("hive_sync.serde_properties") + .stringType() + .noDefaultValue() + .withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 1c051c8cd..768d36e0a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -74,6 +74,8 @@ public class HiveSyncContext { hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE); hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); + hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES); + hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf)); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);