1
0

[HUDI-1871] Fix hive conf for Flink writer hive meta sync (#2968)

This commit is contained in:
swuferhong
2021-05-20 17:03:52 +08:00
committed by GitHub
parent 9b01d2f864
commit 928b09ea0b
3 changed files with 29 additions and 6 deletions

View File

@@ -423,6 +423,12 @@ public class FlinkOptions {
.defaultValue("jdbc:hive2://localhost:10000") .defaultValue("jdbc:hive2://localhost:10000")
.withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'"); .withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS = ConfigOptions
.key("hive_sync.metastore.uris")
.stringType()
.defaultValue("")
.withDescription("Metastore uris for hive sync, default ''");
public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions
.key("hive_sync.partition_fields") .key("hive_sync.partition_fields")
.stringType() .stringType()

View File

@@ -57,7 +57,10 @@ public class HiveSyncContext {
String path = conf.getString(FlinkOptions.PATH); String path = conf.getString(FlinkOptions.PATH);
FileSystem fs = FSUtils.getFs(path, hadoopConf); FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf(); HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf()); if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
hadoopConf.set("hive.metastore.uris", conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
}
hiveConf.addResource(hadoopConf);
return new HiveSyncContext(syncConfig, hiveConf, fs); return new HiveSyncContext(syncConfig, hiveConf, fs);
} }
@@ -71,7 +74,7 @@ public class HiveSyncContext {
hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS) hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)
.split(",")).map(String::trim).collect(Collectors.toList()); .split(",")).map(String::trim).collect(Collectors.toList());
hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS);
hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);

View File

@@ -131,6 +131,8 @@
<include>org.apache.hive:hive-exec</include> <include>org.apache.hive:hive-exec</include>
<include>org.apache.hive:hive-metastore</include> <include>org.apache.hive:hive-metastore</include>
<include>org.apache.hive:hive-jdbc</include> <include>org.apache.hive:hive-jdbc</include>
<include>org.datanucleus:datanucleus-core</include>
<include>org.datanucleus:datanucleus-api-jdo</include>
<include>org.apache.hbase:hbase-common</include> <include>org.apache.hbase:hbase-common</include>
<include>commons-codec:commons-codec</include> <include>commons-codec:commons-codec</include>
@@ -161,10 +163,6 @@
<pattern>org.apache.hadoop.hive.metastore.</pattern> <pattern>org.apache.hadoop.hive.metastore.</pattern>
<shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.metastore.</shadedPattern> <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.metastore.</shadedPattern>
</relocation> </relocation>
<relocation>
<pattern>org.apache.hadoop.hive.ql.</pattern>
<shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.ql.</shadedPattern>
</relocation>
<relocation> <relocation>
<pattern>org.apache.hive.common.</pattern> <pattern>org.apache.hive.common.</pattern>
<shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hive.common.</shadedPattern> <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hive.common.</shadedPattern>
@@ -439,6 +437,10 @@
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>*</artifactId> <artifactId>*</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>javax.servlet.jsp</groupId> <groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId> <artifactId>*</artifactId>
@@ -477,6 +479,18 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
<scope>${flink.bundle.hive.scope}</scope>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-api-jdo</artifactId>
<scope>${flink.bundle.hive.scope}</scope>
<version>5.0.1</version>
</dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>joda-time</groupId>