diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 92fa99c71..24afc013a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -292,6 +292,8 @@ public class DataSourceUtils { props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue()); hiveSyncConfig.jdbcUrl = props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue()); + hiveSyncConfig.metastoreUris = + props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue()); hiveSyncConfig.partitionFields = props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>()); hiveSyncConfig.partitionValueExtractorClass = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index b28c10a81..9811db536 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -427,6 +427,11 @@ object DataSourceWriteOptions { val HIVE_URL: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.jdbcurl") .defaultValue("jdbc:hive2://localhost:10000") + .withDocumentation("Hive jdbc url") + + val METASTORE_URIS: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.metastore.uris") + .defaultValue("thrift://localhost:9083") .withDocumentation("Hive metastore url") val hivePartitionFieldsInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 81522f77d..5dfefe1ac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -539,6 +539,9 @@ object HoodieSparkSqlWriter { val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf) val hiveConf: HiveConf = new HiveConf() hiveConf.addResource(fs.getConf) + if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris) + } new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() true } @@ -554,6 +557,7 @@ object HoodieSparkSqlWriter { hiveSyncConfig.hiveUser = hoodieConfig.getString(HIVE_USER) hiveSyncConfig.hivePass = hoodieConfig.getString(HIVE_PASS) hiveSyncConfig.jdbcUrl = hoodieConfig.getString(HIVE_URL) + hiveSyncConfig.metastoreUris = hoodieConfig.getStringOrDefault(METASTORE_URIS) hiveSyncConfig.skipROSuffix = hoodieConfig.getStringOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean hiveSyncConfig.partitionFields = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 9a940ebce..8a4ad9d85 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -72,6 +72,7 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(HIVE_USER) hoodieConfig.setDefaultValue(HIVE_PASS) hoodieConfig.setDefaultValue(HIVE_URL) + hoodieConfig.setDefaultValue(METASTORE_URIS) hoodieConfig.setDefaultValue(HIVE_PARTITION_FIELDS) hoodieConfig.setDefaultValue(HIVE_PARTITION_EXTRACTOR_CLASS) hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 8dc3c6e0e..254e8ba53 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -49,6 +49,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url") public String jdbcUrl; + @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris") + public String metastoreUris; + @Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true) public String basePath; @@ -137,6 +140,7 @@ public class HiveSyncConfig implements Serializable { newConfig.partitionFields = cfg.partitionFields; newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; newConfig.jdbcUrl = cfg.jdbcUrl; + newConfig.metastoreUris = cfg.metastoreUris; newConfig.tableName = cfg.tableName; newConfig.bucketSpec = cfg.bucketSpec; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; @@ -164,6 +168,7 @@ public class HiveSyncConfig implements Serializable { + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\'' + + ", metastoreUris='" + metastoreUris + '\'' + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java index 682c2daa1..833fce295 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java @@ -160,10 +160,12 @@ public class BootstrapExecutor implements Serializable { * Sync to Hive. */ private void syncHive() { - if (cfg.enableHiveSync) { + if (cfg.enableHiveSync || cfg.enableMetaSync) { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat); - LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" - + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); + HiveConf hiveConf = new HiveConf(fs.getConf(), HiveConf.class); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname,hiveSyncConfig.metastoreUris); + LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString()); + LOG.info("Hive Sync Conf => " + hiveSyncConfig); new HiveSyncTool(hiveSyncConfig, new HiveConf(configuration, HiveConf.class), fs).syncHoodieTable(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 828980506..38e862e94 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -680,9 +680,10 @@ public class DeltaSync implements Serializable { public void syncHive() { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath, cfg.baseFileFormat); - LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" - + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); HiveConf hiveConf = new HiveConf(conf, HiveConf.class); + if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris); + } LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString()); LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString()); new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();