[HUDI-4324] Remove use_jdbc config from hudi sync (#6072)
* [HUDI-4324] Remove use_jdbc config from hudi sync * Users should use HIVE_SYNC_MODE instead
This commit is contained in:
@@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
# Example:
|
# Example:
|
||||||
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
|
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
|
||||||
# hoodie.datasource.hive_sync.use_jdbc true
|
# hoodie.datasource.hive_sync.mode jdbc
|
||||||
# hoodie.datasource.hive_sync.support_timestamp false
|
# hoodie.datasource.hive_sync.support_timestamp false
|
||||||
# hoodie.index.type BLOOM
|
# hoodie.index.type BLOOM
|
||||||
# hoodie.metadata.enable false
|
# hoodie.metadata.enable false
|
||||||
|
|||||||
@@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
|
|||||||
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
|
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
|
||||||
# hive sync
|
# hive sync
|
||||||
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
|
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
|
||||||
|
hoodie.datasource.hive_sync.mode=jdbc
|
||||||
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
|
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
|
||||||
hoodie.datasource.hive_sync.partition_fields=partition
|
hoodie.datasource.hive_sync.partition_fields=partition
|
||||||
@@ -47,6 +47,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
|
|||||||
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
|
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
|
||||||
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
|
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
|
||||||
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
|
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
|
||||||
|
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
|
||||||
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
|
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
|
||||||
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
|
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
|
||||||
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
|
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
|
||||||
@@ -79,6 +80,7 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
|
|||||||
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
|
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
|
||||||
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
|
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
|
||||||
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
|
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
|
||||||
|
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
|
||||||
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
|
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
|
||||||
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
|
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
|
||||||
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
|
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
|
||||||
|
|||||||
@@ -185,7 +185,7 @@ public class TestDFSPropertiesConfiguration {
|
|||||||
DFSPropertiesConfiguration.refreshGlobalProps();
|
DFSPropertiesConfiguration.refreshGlobalProps();
|
||||||
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
|
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
|
||||||
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
|
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
|
||||||
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
|
assertEquals("jdbc", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.mode"));
|
||||||
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
|
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
|
||||||
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
|
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
|
||||||
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
|
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
|
||||||
|
|||||||
@@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
# Example:
|
# Example:
|
||||||
hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
|
hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
|
||||||
hoodie.datasource.hive_sync.use_jdbc true
|
hoodie.datasource.hive_sync.mode jdbc
|
||||||
hoodie.datasource.hive_sync.support_timestamp false
|
hoodie.datasource.hive_sync.support_timestamp false
|
||||||
hoodie.index.type BLOOM
|
hoodie.index.type BLOOM
|
||||||
hoodie.metadata.enable true
|
hoodie.metadata.enable true
|
||||||
|
|||||||
@@ -755,12 +755,6 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
.defaultValue(false)
|
.defaultValue(false)
|
||||||
.withDescription("Assume partitioning is yyyy/mm/dd, default false");
|
.withDescription("Assume partitioning is yyyy/mm/dd, default false");
|
||||||
|
|
||||||
public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
|
|
||||||
.key("hive_sync.use_jdbc")
|
|
||||||
.booleanType()
|
|
||||||
.defaultValue(true)
|
|
||||||
.withDescription("Use JDBC when hive synchronization is enabled, default true");
|
|
||||||
|
|
||||||
public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
|
public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
|
||||||
.key("hive_sync.auto_create_db")
|
.key("hive_sync.auto_create_db")
|
||||||
.booleanType()
|
.booleanType()
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES;
|
|||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
|
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
|
||||||
@@ -105,7 +104,6 @@ public class HiveSyncContext {
|
|||||||
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES));
|
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES));
|
||||||
props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", FilePathUtils.extractHivePartitionFields(conf)));
|
props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", FilePathUtils.extractHivePartitionFields(conf)));
|
||||||
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME));
|
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME));
|
||||||
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC)));
|
|
||||||
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(conf.getBoolean(FlinkOptions.METADATA_ENABLED)));
|
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(conf.getBoolean(FlinkOptions.METADATA_ENABLED)));
|
||||||
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS)));
|
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS)));
|
||||||
props.setPropertyIfNonNull(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP)));
|
props.setPropertyIfNonNull(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP)));
|
||||||
|
|||||||
@@ -320,9 +320,6 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
|
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
|
||||||
public Boolean hiveSyncAssumeDatePartition = false;
|
public Boolean hiveSyncAssumeDatePartition = false;
|
||||||
|
|
||||||
@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization is enabled, default true")
|
|
||||||
public Boolean hiveSyncUseJdbc = true;
|
|
||||||
|
|
||||||
@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
|
@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
|
||||||
public Boolean hiveSyncAutoCreateDb = true;
|
public Boolean hiveSyncAutoCreateDb = true;
|
||||||
|
|
||||||
@@ -419,7 +416,6 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
|
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
|
||||||
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, config.hiveSyncPartitionExtractorClass);
|
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, config.hiveSyncPartitionExtractorClass);
|
||||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
|
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
|
||||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC, config.hiveSyncUseJdbc);
|
|
||||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
|
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
|
||||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
|
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
|
||||||
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
|
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
|
||||||
|
|||||||
@@ -21,7 +21,6 @@
|
|||||||
"hoodie.datasource.hive_sync.table": "huditesttopic",
|
"hoodie.datasource.hive_sync.table": "huditesttopic",
|
||||||
"hoodie.datasource.hive_sync.partition_fields": "date",
|
"hoodie.datasource.hive_sync.partition_fields": "date",
|
||||||
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
|
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
|
||||||
"hoodie.datasource.hive_sync.use_jdbc": "false",
|
|
||||||
"hoodie.datasource.hive_sync.mode": "hms",
|
"hoodie.datasource.hive_sync.mode": "hms",
|
||||||
"dfs.client.use.datanode.hostname": "true",
|
"dfs.client.use.datanode.hostname": "true",
|
||||||
"hive.metastore.uris": "thrift://hivemetastore:9083",
|
"hive.metastore.uris": "thrift://hivemetastore:9083",
|
||||||
|
|||||||
@@ -172,7 +172,6 @@ public class KafkaConnectConfigs extends HoodieConfig {
|
|||||||
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
|
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
|
||||||
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
|
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
|
||||||
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
|
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
|
||||||
public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
|
|
||||||
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
|
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
|
||||||
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
|
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
|
||||||
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
|
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
|
||||||
|
|||||||
@@ -439,10 +439,6 @@ object DataSourceWriteOptions {
|
|||||||
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION
|
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT
|
val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT
|
||||||
|
|
||||||
/** @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0 */
|
|
||||||
@Deprecated
|
|
||||||
val HIVE_USE_JDBC: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_JDBC
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE
|
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@@ -502,9 +498,6 @@ object DataSourceWriteOptions {
|
|||||||
/** @deprecated Use {@link HIVE_USE_PRE_APACHE_INPUT_FORMAT} and its methods instead */
|
/** @deprecated Use {@link HIVE_USE_PRE_APACHE_INPUT_FORMAT} and its methods instead */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
|
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
|
||||||
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
|
|
||||||
@Deprecated
|
|
||||||
val HIVE_USE_JDBC_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_JDBC.key()
|
|
||||||
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
|
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.key()
|
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.key()
|
||||||
@@ -695,9 +688,6 @@ object DataSourceWriteOptions {
|
|||||||
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.defaultValue()
|
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.defaultValue()
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
||||||
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
|
|
||||||
@Deprecated
|
|
||||||
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = HiveSyncConfigHolder.HIVE_USE_JDBC.defaultValue()
|
|
||||||
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
|
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.defaultValue()
|
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.defaultValue()
|
||||||
|
|||||||
@@ -78,7 +78,6 @@ object HoodieWriterUtils {
|
|||||||
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS)
|
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS)
|
||||||
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
|
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
|
||||||
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
|
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
|
||||||
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_USE_JDBC)
|
|
||||||
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE)
|
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE)
|
||||||
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE)
|
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE)
|
||||||
hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE)
|
hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE)
|
||||||
|
|||||||
@@ -45,7 +45,6 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES;
|
|||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
|
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
|
||||||
|
|
||||||
@@ -95,9 +94,6 @@ public class HiveSyncConfig extends HoodieSyncConfig {
|
|||||||
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
|
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
|
||||||
+ "org.apache.hudi input format.")
|
+ "org.apache.hudi input format.")
|
||||||
public Boolean usePreApacheInputFormat;
|
public Boolean usePreApacheInputFormat;
|
||||||
@Deprecated
|
|
||||||
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
|
|
||||||
public Boolean useJdbc;
|
|
||||||
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
|
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
|
||||||
public String metastoreUris;
|
public String metastoreUris;
|
||||||
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
|
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
|
||||||
@@ -142,7 +138,6 @@ public class HiveSyncConfig extends HoodieSyncConfig {
|
|||||||
props.setPropertyIfNonNull(HIVE_PASS.key(), hivePass);
|
props.setPropertyIfNonNull(HIVE_PASS.key(), hivePass);
|
||||||
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
|
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
|
||||||
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), usePreApacheInputFormat);
|
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), usePreApacheInputFormat);
|
||||||
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
|
|
||||||
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
|
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
|
||||||
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
|
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
|
||||||
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), autoCreateDatabase);
|
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), autoCreateDatabase);
|
||||||
|
|||||||
@@ -52,15 +52,6 @@ public class HiveSyncConfigHolder {
|
|||||||
.withDocumentation("Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. "
|
.withDocumentation("Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. "
|
||||||
+ "Use this when you are in the process of migrating from "
|
+ "Use this when you are in the process of migrating from "
|
||||||
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");
|
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static final ConfigProperty<String> HIVE_USE_JDBC = ConfigProperty
|
|
||||||
.key("hoodie.datasource.hive_sync.use_jdbc")
|
|
||||||
.defaultValue("true")
|
|
||||||
.deprecatedAfter("0.9.0")
|
|
||||||
.withDocumentation("Use JDBC when hive synchronization is enabled");
|
|
||||||
public static final ConfigProperty<String> METASTORE_URIS = ConfigProperty
|
public static final ConfigProperty<String> METASTORE_URIS = ConfigProperty
|
||||||
.key("hoodie.datasource.hive_sync.metastore.uris")
|
.key("hoodie.datasource.hive_sync.metastore.uris")
|
||||||
.defaultValue("thrift://localhost:9083")
|
.defaultValue("thrift://localhost:9083")
|
||||||
@@ -109,7 +100,7 @@ public class HiveSyncConfigHolder {
|
|||||||
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
|
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
|
||||||
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
|
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
|
||||||
.key("hoodie.datasource.hive_sync.mode")
|
.key("hoodie.datasource.hive_sync.mode")
|
||||||
.noDefaultValue()
|
.defaultValue("jdbc")
|
||||||
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.");
|
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.");
|
||||||
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = ConfigProperty
|
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = ConfigProperty
|
||||||
.key("hoodie.datasource.hive_sync.bucket_sync")
|
.key("hoodie.datasource.hive_sync.bucket_sync")
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ package org.apache.hudi.hive;
|
|||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.hive.ddl.DDLExecutor;
|
import org.apache.hudi.hive.ddl.DDLExecutor;
|
||||||
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
|
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
|
||||||
@@ -49,7 +48,6 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
|
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
|
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
||||||
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
|
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
|
||||||
|
|
||||||
@@ -72,23 +70,19 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
|
|||||||
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
|
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
|
||||||
// disable jdbc and depend on metastore client for all hive registrations
|
// disable jdbc and depend on metastore client for all hive registrations
|
||||||
try {
|
try {
|
||||||
if (!StringUtils.isNullOrEmpty(config.getString(HIVE_SYNC_MODE))) {
|
HiveSyncMode syncMode = HiveSyncMode.of(config.getStringOrDefault(HIVE_SYNC_MODE));
|
||||||
HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
|
switch (syncMode) {
|
||||||
switch (syncMode) {
|
case HMS:
|
||||||
case HMS:
|
ddlExecutor = new HMSDDLExecutor(config);
|
||||||
ddlExecutor = new HMSDDLExecutor(config);
|
break;
|
||||||
break;
|
case HIVEQL:
|
||||||
case HIVEQL:
|
ddlExecutor = new HiveQueryDDLExecutor(config);
|
||||||
ddlExecutor = new HiveQueryDDLExecutor(config);
|
break;
|
||||||
break;
|
case JDBC:
|
||||||
case JDBC:
|
ddlExecutor = new JDBCExecutor(config);
|
||||||
ddlExecutor = new JDBCExecutor(config);
|
break;
|
||||||
break;
|
default:
|
||||||
default:
|
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
|
||||||
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ddlExecutor = config.getBoolean(HIVE_USE_JDBC) ? new JDBCExecutor(config) : new HiveQueryDDLExecutor(config);
|
|
||||||
}
|
}
|
||||||
this.client = Hive.get(config.getHiveConf()).getMSC();
|
this.client = Hive.get(config.getHiveConf()).getMSC();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
|
|||||||
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
|
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class offers DDL executor backed by the hive.ql Driver This class preserves the old useJDBC = false way of doing things.
|
* This class offers DDL executor backed by the HiveQL Driver.
|
||||||
*/
|
*/
|
||||||
public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
|
public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
|
||||||
|
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
|
|||||||
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
|
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class offers DDL executor backed by the jdbc This class preserves the old useJDBC = true way of doing things.
|
* This class offers DDL executor backed by the jdbc.
|
||||||
*/
|
*/
|
||||||
public class JDBCExecutor extends QueryBasedDDLExecutor {
|
public class JDBCExecutor extends QueryBasedDDLExecutor {
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import java.io.IOException;
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
|
||||||
|
|
||||||
@@ -92,6 +93,7 @@ public class HiveSyncGlobalCommitParams {
|
|||||||
String jdbcUrl = forRemote ? loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
|
String jdbcUrl = forRemote ? loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
|
||||||
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, loadedProps.getProperty(HIVE_URL.key()));
|
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, loadedProps.getProperty(HIVE_URL.key()));
|
||||||
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
|
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
|
||||||
|
props.setProperty(HIVE_SYNC_MODE.key(), "jdbc");
|
||||||
LOG.info("building hivesync config forRemote: " + forRemote + " " + jdbcUrl + " "
|
LOG.info("building hivesync config forRemote: " + forRemote + " " + jdbcUrl + " "
|
||||||
+ basePath);
|
+ basePath);
|
||||||
return props;
|
return props;
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ public class TestHiveSyncTool {
|
|||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
|
||||||
// (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource)
|
// (useSchemaFromCommitMetadata, syncAsDataSource, syncMode)
|
||||||
private static Iterable<Object[]> syncDataSourceTableParams() {
|
private static Iterable<Object[]> syncDataSourceTableParams() {
|
||||||
List<Object[]> opts = new ArrayList<>();
|
List<Object[]> opts = new ArrayList<>();
|
||||||
for (Object mode : SYNC_MODES) {
|
for (Object mode : SYNC_MODES) {
|
||||||
|
|||||||
@@ -174,8 +174,6 @@ public class HoodieDropPartitionsTool implements Serializable {
|
|||||||
public String hiveURL = "jdbc:hive2://localhost:10000";
|
public String hiveURL = "jdbc:hive2://localhost:10000";
|
||||||
@Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false)
|
@Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false)
|
||||||
public String hivePartitionsField = "";
|
public String hivePartitionsField = "";
|
||||||
@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization.", required = false)
|
|
||||||
public boolean hiveUseJdbc = true;
|
|
||||||
@Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false)
|
@Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false)
|
||||||
public String hiveHMSUris = null;
|
public String hiveHMSUris = null;
|
||||||
@Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false)
|
@Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false)
|
||||||
@@ -215,7 +213,7 @@ public class HoodieDropPartitionsTool implements Serializable {
|
|||||||
+ " --hive-pass-word " + "Masked" + ", \n"
|
+ " --hive-pass-word " + "Masked" + ", \n"
|
||||||
+ " --hive-jdbc-url " + hiveURL + ", \n"
|
+ " --hive-jdbc-url " + hiveURL + ", \n"
|
||||||
+ " --hive-partition-field " + hivePartitionsField + ", \n"
|
+ " --hive-partition-field " + hivePartitionsField + ", \n"
|
||||||
+ " --hive-sync-use-jdbc " + hiveUseJdbc + ", \n"
|
+ " --hive-sync-mode " + hiveSyncMode + ", \n"
|
||||||
+ " --hive-metastore-uris " + hiveHMSUris + ", \n"
|
+ " --hive-metastore-uris " + hiveHMSUris + ", \n"
|
||||||
+ " --hive-sync-ignore-exception " + hiveSyncIgnoreException + ", \n"
|
+ " --hive-sync-ignore-exception " + hiveSyncIgnoreException + ", \n"
|
||||||
+ " --hive-partition-value-extractor-class " + partitionValueExtractorClass + ", \n"
|
+ " --hive-partition-value-extractor-class " + partitionValueExtractorClass + ", \n"
|
||||||
@@ -247,7 +245,7 @@ public class HoodieDropPartitionsTool implements Serializable {
|
|||||||
&& Objects.equals(hivePassWord, config.hivePassWord)
|
&& Objects.equals(hivePassWord, config.hivePassWord)
|
||||||
&& Objects.equals(hiveURL, config.hiveURL)
|
&& Objects.equals(hiveURL, config.hiveURL)
|
||||||
&& Objects.equals(hivePartitionsField, config.hivePartitionsField)
|
&& Objects.equals(hivePartitionsField, config.hivePartitionsField)
|
||||||
&& Objects.equals(hiveUseJdbc, config.hiveUseJdbc)
|
&& Objects.equals(hiveSyncMode, config.hiveSyncMode)
|
||||||
&& Objects.equals(hiveHMSUris, config.hiveHMSUris)
|
&& Objects.equals(hiveHMSUris, config.hiveHMSUris)
|
||||||
&& Objects.equals(partitionValueExtractorClass, config.partitionValueExtractorClass)
|
&& Objects.equals(partitionValueExtractorClass, config.partitionValueExtractorClass)
|
||||||
&& Objects.equals(sparkMaster, config.sparkMaster)
|
&& Objects.equals(sparkMaster, config.sparkMaster)
|
||||||
@@ -261,7 +259,7 @@ public class HoodieDropPartitionsTool implements Serializable {
|
|||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(basePath, runningMode, tableName, partitions, instantTime,
|
return Objects.hash(basePath, runningMode, tableName, partitions, instantTime,
|
||||||
syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, hiveURL,
|
syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, hiveURL,
|
||||||
hivePartitionsField, hiveUseJdbc, hiveHMSUris, partitionValueExtractorClass,
|
hivePartitionsField, hiveSyncMode, hiveHMSUris, partitionValueExtractorClass,
|
||||||
sparkMaster, sparkMemory, propsFilePath, configs, hiveSyncIgnoreException, help);
|
sparkMaster, sparkMemory, propsFilePath, configs, hiveSyncIgnoreException, help);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -350,7 +348,6 @@ public class HoodieDropPartitionsTool implements Serializable {
|
|||||||
props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
|
props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
|
||||||
props.put(DataSourceWriteOptions.HIVE_URL().key(), cfg.hiveURL);
|
props.put(DataSourceWriteOptions.HIVE_URL().key(), cfg.hiveURL);
|
||||||
props.put(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), cfg.hivePartitionsField);
|
props.put(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), cfg.hivePartitionsField);
|
||||||
props.put(DataSourceWriteOptions.HIVE_USE_JDBC().key(), cfg.hiveUseJdbc);
|
|
||||||
props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode);
|
props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode);
|
||||||
props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException);
|
props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException);
|
||||||
props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
|
props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
|
||||||
|
|||||||
Reference in New Issue
Block a user