From e8fcf04b5709d795a1de086824b6eff462c8c835 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 9 Jun 2021 14:20:04 +0800 Subject: [PATCH] [HUDI-1987] Fix non partition table hive meta sync for flink writer (#3049) --- .../org/apache/hudi/sink/utils/HiveSyncContext.java | 7 +++---- .../org/apache/hudi/table/format/FilePathUtils.java | 13 +++++++++++++ .../table/format/mor/MergeOnReadInputFormat.java | 9 +-------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 79fbd19db..62a67980d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -29,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import java.util.Arrays; -import java.util.stream.Collectors; /** * Hive synchronization context. @@ -58,7 +58,7 @@ public class HiveSyncContext { FileSystem fs = FSUtils.getFs(path, hadoopConf); HiveConf hiveConf = new HiveConf(); if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) { - hadoopConf.set("hive.metastore.uris", conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS)); + hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS)); } hiveConf.addResource(hadoopConf); return new HiveSyncContext(syncConfig, hiveConf, fs); @@ -74,8 +74,7 @@ public class HiveSyncContext { hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME); hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD); hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL); - hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.PARTITION_PATH_FIELD) - .split(",")).map(String::trim).collect(Collectors.toList()); + hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf)); hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS); hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC); // needs to support metadata table for flink diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index e35a69371..e8ee5a244 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -391,4 +391,17 @@ public class FilePathUtils { public static org.apache.flink.core.fs.Path toFlinkPath(Path path) { return new org.apache.flink.core.fs.Path(path.toUri()); } + + /** + * Extracts the partition keys with given configuration. + * + * @param conf The flink configuration + * @return array of the partition fields + */ + public static String[] extractPartitionKeys(org.apache.flink.configuration.Configuration conf) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) { + return new String[0]; + } + return conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 7375c3018..c7a910608 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -273,7 +273,7 @@ public class MergeOnReadInputFormat LinkedHashMap partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), - extractPartitionKeys()); + FilePathUtils.extractPartitionKeys(this.conf)); LinkedHashMap partObjects = new LinkedHashMap<>(); partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType( defaultPartName.equals(v) ? null : v, @@ -293,13 +293,6 @@ public class MergeOnReadInputFormat Long.MAX_VALUE); // read the whole file } - private String[] extractPartitionKeys() { - if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) { - return new String[0]; - } - return this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); - } - private Iterator getLogFileIterator(MergeOnReadInputSplit split) { final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema()); final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());