1
0

[HUDI-2228] Add option 'hive_sync.mode' for flink writer (#3352)

This commit is contained in:
swuferhong
2021-07-28 19:45:50 +08:00
committed by GitHub
parent eedfadeb46
commit 7739518879
3 changed files with 11 additions and 0 deletions

View File

@@ -478,6 +478,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue("PARQUET")
.withDescription("File format for hive sync, default 'PARQUET'");
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
.defaultValue("jdbc")
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
.key("hive_sync.username")
.stringType()

View File

@@ -71,6 +71,7 @@ public class HiveSyncContext {
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);

View File

@@ -248,6 +248,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--hive-sync-file-format"}, description = "File format for hive sync, default 'PARQUET'")
public String hiveSyncFileFormat = "PARQUET";
@Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'")
public String hiveSyncMode = "jdbc";
@Parameter(names = {"--hive-sync-username"}, description = "Username for hive sync, default 'hive'")
public String hiveSyncUsername = "hive";
@@ -349,6 +352,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.HIVE_SYNC_DB, config.hiveSyncDb);
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, config.hiveSyncTable);
conf.setString(FlinkOptions.HIVE_SYNC_FILE_FORMAT, config.hiveSyncFileFormat);
conf.setString(FlinkOptions.HIVE_SYNC_MODE, config.hiveSyncMode);
conf.setString(FlinkOptions.HIVE_SYNC_USERNAME, config.hiveSyncUsername);
conf.setString(FlinkOptions.HIVE_SYNC_PASSWORD, config.hiveSyncPassword);
conf.setString(FlinkOptions.HIVE_SYNC_JDBC_URL, config.hiveSyncJdbcUrl);