From 928b09ea0b2d97bc5055170c8c87fa0103615b4c Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Thu, 20 May 2021 17:03:52 +0800 Subject: [PATCH] [HUDI-1871] Fix hive conf for Flink writer hive meta sync (#2968) --- .../hudi/configuration/FlinkOptions.java | 6 +++++ .../hudi/sink/utils/HiveSyncContext.java | 7 ++++-- packaging/hudi-flink-bundle/pom.xml | 22 +++++++++++++++---- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 4f1be0bdb..852639293 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -423,6 +423,12 @@ public class FlinkOptions { .defaultValue("jdbc:hive2://localhost:10000") .withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'"); + public static final ConfigOption HIVE_SYNC_METASTORE_URIS = ConfigOptions + .key("hive_sync.metastore.uris") + .stringType() + .defaultValue("") + .withDescription("Metastore uris for hive sync, default ''"); + public static final ConfigOption HIVE_SYNC_PARTITION_FIELDS = ConfigOptions .key("hive_sync.partition_fields") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index a791076c1..79fbd19db 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -57,7 +57,10 @@ public class HiveSyncContext { String path = conf.getString(FlinkOptions.PATH); FileSystem fs = FSUtils.getFs(path, hadoopConf); HiveConf hiveConf = new HiveConf(); - hiveConf.addResource(fs.getConf()); + if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) { + hadoopConf.set("hive.metastore.uris", conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS)); + } + hiveConf.addResource(hadoopConf); return new HiveSyncContext(syncConfig, hiveConf, fs); } @@ -71,7 +74,7 @@ public class HiveSyncContext { hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); - hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS) + hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.PARTITION_PATH_FIELD) .split(",")).map(String::trim).collect(Collectors.toList()); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS); hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index ce106d7c7..9e2b5f975 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -131,6 +131,8 @@ org.apache.hive:hive-exec org.apache.hive:hive-metastore org.apache.hive:hive-jdbc + org.datanucleus:datanucleus-core + org.datanucleus:datanucleus-api-jdo org.apache.hbase:hbase-common commons-codec:commons-codec @@ -161,10 +163,6 @@ org.apache.hadoop.hive.metastore. ${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.metastore. - - org.apache.hadoop.hive.ql. - ${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.ql. - org.apache.hive.common. ${flink.bundle.hive.shade.prefix}org.apache.hive.common. @@ -439,6 +437,10 @@ javax.servlet * + + org.datanucleus + datanucleus-core + javax.servlet.jsp * @@ -477,6 +479,18 @@ + + org.datanucleus + datanucleus-core + ${flink.bundle.hive.scope} + 5.0.1 + + + org.datanucleus + datanucleus-api-jdo + ${flink.bundle.hive.scope} + 5.0.1 + joda-time