1
0

[HUDI-4174] Add hive conf dir option for flink sink (#5725)

This commit is contained in:
Danny Chan
2022-06-01 16:17:36 +08:00
committed by GitHub
parent 795a99ba73
commit 0d069b5e57
4 changed files with 32 additions and 2 deletions

View File

@@ -769,6 +769,12 @@ public class FlinkOptions extends HoodieConfig {
.noDefaultValue() .noDefaultValue()
.withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2"); .withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2");
public static final ConfigOption<String> HIVE_SYNC_CONF_DIR = ConfigOptions
.key("hive_sync.conf.dir")
.stringType()
.noDefaultValue()
.withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine");
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Utilities // Utilities
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.configuration; package org.apache.hudi.configuration;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.util.FlinkClientUtil; import org.apache.hudi.util.FlinkClientUtil;
import java.util.Map; import java.util.Map;
@@ -51,4 +53,16 @@ public class HadoopConfigurations {
options.forEach(hadoopConf::set); options.forEach(hadoopConf::set);
return hadoopConf; return hadoopConf;
} }
/**
* Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set.
*/
public static org.apache.hadoop.conf.Configuration getHiveConf(Configuration conf) {
String explicitDir = conf.getString(FlinkOptions.HIVE_SYNC_CONF_DIR, System.getenv("HIVE_CONF_DIR"));
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
if (explicitDir != null) {
hadoopConf.addResource(new Path(explicitDir, "hive-site.xml"));
}
return hadoopConf;
}
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.CommitAckEvent;
@@ -82,6 +84,11 @@ public class StreamWriteOperatorCoordinator
*/ */
private final Configuration conf; private final Configuration conf;
/**
* Hive config options.
*/
private final SerializableConfiguration hiveConf;
/** /**
* Coordinator context. * Coordinator context.
*/ */
@@ -160,6 +167,7 @@ public class StreamWriteOperatorCoordinator
this.conf = conf; this.conf = conf;
this.context = context; this.context = context;
this.parallelism = context.currentParallelism(); this.parallelism = context.currentParallelism();
this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
} }
@Override @Override
@@ -314,7 +322,7 @@ public class StreamWriteOperatorCoordinator
private void initHiveSync() { private void initHiveSync() {
this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
this.hiveSyncContext = HiveSyncContext.create(conf); this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
} }
private void syncHiveAsync() { private void syncHiveAsync() {

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.sink.utils;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool; import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.HadoopConfigurations;
@@ -58,7 +59,7 @@ public class HiveSyncContext {
return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs); return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs);
} }
public static HiveSyncContext create(Configuration conf) { public static HiveSyncContext create(Configuration conf, SerializableConfiguration serConf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf); HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf); org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
String path = conf.getString(FlinkOptions.PATH); String path = conf.getString(FlinkOptions.PATH);
@@ -67,6 +68,7 @@ public class HiveSyncContext {
if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) { if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS)); hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
} }
hiveConf.addResource(serConf.get());
hiveConf.addResource(hadoopConf); hiveConf.addResource(hadoopConf);
return new HiveSyncContext(syncConfig, hiveConf, fs); return new HiveSyncContext(syncConfig, hiveConf, fs);
} }