1
0

[HUDI-1987] Fix non partition table hive meta sync for flink writer (#3049)

This commit is contained in:
Danny Chan
2021-06-09 14:20:04 +08:00
committed by GitHub
parent a6f5fc5967
commit e8fcf04b57
3 changed files with 17 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
@@ -29,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* Hive synchronization context.
@@ -58,7 +58,7 @@ public class HiveSyncContext {
FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf();
if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
hadoopConf.set("hive.metastore.uris", conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
}
hiveConf.addResource(hadoopConf);
return new HiveSyncContext(syncConfig, hiveConf, fs);
@@ -74,8 +74,7 @@ public class HiveSyncContext {
hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)
.split(",")).map(String::trim).collect(Collectors.toList());
hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractPartitionKeys(conf));
hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS);
hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
// needs to support metadata table for flink

View File

@@ -391,4 +391,17 @@ public class FilePathUtils {
public static org.apache.flink.core.fs.Path toFlinkPath(Path path) {
return new org.apache.flink.core.fs.Path(path.toUri());
}
/**
* Extracts the partition keys with given configuration.
*
* @param conf The flink configuration
* @return array of the partition fields
*/
public static String[] extractPartitionKeys(org.apache.flink.configuration.Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
return new String[0];
}
return conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
}
}

View File

@@ -273,7 +273,7 @@ public class MergeOnReadInputFormat
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
extractPartitionKeys());
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
defaultPartName.equals(v) ? null : v,
@@ -293,13 +293,6 @@ public class MergeOnReadInputFormat
Long.MAX_VALUE); // read the whole file
}
private String[] extractPartitionKeys() {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
return new String[0];
}
return this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
}
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());