[HUDI-2491] Expose HMS mode metastore uri config option for spark writer (#3962)
This commit is contained in:
@@ -292,6 +292,8 @@ public class DataSourceUtils {
|
||||
props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
|
||||
hiveSyncConfig.jdbcUrl =
|
||||
props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
|
||||
hiveSyncConfig.metastoreUris =
|
||||
props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
|
||||
hiveSyncConfig.partitionFields =
|
||||
props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
|
||||
hiveSyncConfig.partitionValueExtractorClass =
|
||||
|
||||
@@ -427,6 +427,11 @@ object DataSourceWriteOptions {
|
||||
val HIVE_URL: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.jdbcurl")
|
||||
.defaultValue("jdbc:hive2://localhost:10000")
|
||||
.withDocumentation("Hive jdbc url")
|
||||
|
||||
val METASTORE_URIS: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.metastore.uris")
|
||||
.defaultValue("thrift://localhost:9083")
|
||||
.withDocumentation("Hive metastore url")
|
||||
|
||||
val hivePartitionFieldsInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
|
||||
|
||||
@@ -539,6 +539,9 @@ object HoodieSparkSqlWriter {
|
||||
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf)
|
||||
val hiveConf: HiveConf = new HiveConf()
|
||||
hiveConf.addResource(fs.getConf)
|
||||
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
|
||||
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris)
|
||||
}
|
||||
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
|
||||
true
|
||||
}
|
||||
@@ -554,6 +557,7 @@ object HoodieSparkSqlWriter {
|
||||
hiveSyncConfig.hiveUser = hoodieConfig.getString(HIVE_USER)
|
||||
hiveSyncConfig.hivePass = hoodieConfig.getString(HIVE_PASS)
|
||||
hiveSyncConfig.jdbcUrl = hoodieConfig.getString(HIVE_URL)
|
||||
hiveSyncConfig.metastoreUris = hoodieConfig.getStringOrDefault(METASTORE_URIS)
|
||||
hiveSyncConfig.skipROSuffix = hoodieConfig.getStringOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE,
|
||||
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
|
||||
hiveSyncConfig.partitionFields =
|
||||
|
||||
@@ -72,6 +72,7 @@ object HoodieWriterUtils {
|
||||
hoodieConfig.setDefaultValue(HIVE_USER)
|
||||
hoodieConfig.setDefaultValue(HIVE_PASS)
|
||||
hoodieConfig.setDefaultValue(HIVE_URL)
|
||||
hoodieConfig.setDefaultValue(METASTORE_URIS)
|
||||
hoodieConfig.setDefaultValue(HIVE_PARTITION_FIELDS)
|
||||
hoodieConfig.setDefaultValue(HIVE_PARTITION_EXTRACTOR_CLASS)
|
||||
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
|
||||
|
||||
@@ -49,6 +49,9 @@ public class HiveSyncConfig implements Serializable {
|
||||
@Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
|
||||
public String jdbcUrl;
|
||||
|
||||
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
|
||||
public String metastoreUris;
|
||||
|
||||
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
|
||||
public String basePath;
|
||||
|
||||
@@ -137,6 +140,7 @@ public class HiveSyncConfig implements Serializable {
|
||||
newConfig.partitionFields = cfg.partitionFields;
|
||||
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
|
||||
newConfig.jdbcUrl = cfg.jdbcUrl;
|
||||
newConfig.metastoreUris = cfg.metastoreUris;
|
||||
newConfig.tableName = cfg.tableName;
|
||||
newConfig.bucketSpec = cfg.bucketSpec;
|
||||
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
|
||||
@@ -164,6 +168,7 @@ public class HiveSyncConfig implements Serializable {
|
||||
+ ", hiveUser='" + hiveUser + '\''
|
||||
+ ", hivePass='" + hivePass + '\''
|
||||
+ ", jdbcUrl='" + jdbcUrl + '\''
|
||||
+ ", metastoreUris='" + metastoreUris + '\''
|
||||
+ ", basePath='" + basePath + '\''
|
||||
+ ", partitionFields=" + partitionFields
|
||||
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
|
||||
|
||||
@@ -160,10 +160,12 @@ public class BootstrapExecutor implements Serializable {
|
||||
* Sync to Hive.
|
||||
*/
|
||||
private void syncHive() {
|
||||
if (cfg.enableHiveSync) {
|
||||
if (cfg.enableHiveSync || cfg.enableMetaSync) {
|
||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
|
||||
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
|
||||
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
|
||||
HiveConf hiveConf = new HiveConf(fs.getConf(), HiveConf.class);
|
||||
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname,hiveSyncConfig.metastoreUris);
|
||||
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
|
||||
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
|
||||
new HiveSyncTool(hiveSyncConfig, new HiveConf(configuration, HiveConf.class), fs).syncHoodieTable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -680,9 +680,10 @@ public class DeltaSync implements Serializable {
|
||||
|
||||
public void syncHive() {
|
||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat);
|
||||
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
|
||||
+ hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
|
||||
HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
|
||||
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
|
||||
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
|
||||
}
|
||||
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
|
||||
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
|
||||
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
|
||||
|
||||
Reference in New Issue
Block a user