[HUDI-1192] Make create hive database automatically configurable (#1968)
This commit is contained in:
@@ -254,6 +254,8 @@ public class DataSourceUtils {
|
|||||||
SlashEncodedDayPartitionValueExtractor.class.getName());
|
SlashEncodedDayPartitionValueExtractor.class.getName());
|
||||||
hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY(),
|
hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY(),
|
||||||
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
|
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
|
||||||
|
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
|
||||||
|
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
|
||||||
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
|
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
|
||||||
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
|
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
|
||||||
return hiveSyncConfig;
|
return hiveSyncConfig;
|
||||||
|
|||||||
@@ -290,6 +290,7 @@ object DataSourceWriteOptions {
|
|||||||
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = "hoodie.datasource.hive_sync.assume_date_partitioning"
|
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = "hoodie.datasource.hive_sync.assume_date_partitioning"
|
||||||
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
|
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
|
||||||
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
|
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
|
||||||
|
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"
|
||||||
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
|
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
|
||||||
|
|
||||||
// DEFAULT FOR HIVE SPECIFIC CONFIGS
|
// DEFAULT FOR HIVE SPECIFIC CONFIGS
|
||||||
@@ -306,6 +307,7 @@ object DataSourceWriteOptions {
|
|||||||
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
|
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
|
||||||
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
|
||||||
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
|
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
|
||||||
|
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
|
||||||
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
|
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
|
||||||
|
|
||||||
// Async Compaction - Enabled by default for MOR
|
// Async Compaction - Enabled by default for MOR
|
||||||
|
|||||||
@@ -71,6 +71,9 @@ public class HiveSyncConfig implements Serializable {
|
|||||||
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
|
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
|
||||||
public Boolean useJdbc = true;
|
public Boolean useJdbc = true;
|
||||||
|
|
||||||
|
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
|
||||||
|
public Boolean autoCreateDatabase = true;
|
||||||
|
|
||||||
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
|
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
|
||||||
public Boolean skipROSuffix = false;
|
public Boolean skipROSuffix = false;
|
||||||
|
|
||||||
|
|||||||
@@ -117,11 +117,17 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
|
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
|
||||||
|
|
||||||
// check if the database exists else create it
|
// check if the database exists else create it
|
||||||
try {
|
if (cfg.autoCreateDatabase) {
|
||||||
hoodieHiveClient.updateHiveSQL("create database if not exists " + cfg.databaseName);
|
try {
|
||||||
} catch (Exception e) {
|
hoodieHiveClient.updateHiveSQL("create database if not exists " + cfg.databaseName);
|
||||||
// this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
|
} catch (Exception e) {
|
||||||
LOG.warn("Unable to create database", e);
|
// this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
|
||||||
|
LOG.warn("Unable to create database", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!hoodieHiveClient.doesDataBaseExist(cfg.databaseName)) {
|
||||||
|
throw new HoodieHiveSyncException("hive database does not exist " + cfg.databaseName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the parquet schema for this table looking at the latest commit
|
// Get the parquet schema for this table looking at the latest commit
|
||||||
|
|||||||
@@ -18,6 +18,11 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hive;
|
package org.apache.hudi.hive;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||||
|
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||||
|
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||||
|
import org.apache.hadoop.hive.metastore.api.Table;
|
||||||
|
import org.apache.hadoop.hive.metastore.api.Database;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.StorageSchemes;
|
import org.apache.hudi.common.fs.StorageSchemes;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
@@ -29,10 +34,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
|
||||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
|
||||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
|
||||||
import org.apache.hadoop.hive.metastore.api.Table;
|
|
||||||
import org.apache.hadoop.hive.ql.metadata.Hive;
|
import org.apache.hadoop.hive.ql.metadata.Hive;
|
||||||
import org.apache.hadoop.hive.ql.metadata.HiveException;
|
import org.apache.hadoop.hive.ql.metadata.HiveException;
|
||||||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
|
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
|
||||||
@@ -336,6 +337,22 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param databaseName
|
||||||
|
* @return true if the configured database exists
|
||||||
|
*/
|
||||||
|
public boolean doesDataBaseExist(String databaseName) {
|
||||||
|
try {
|
||||||
|
Database database = client.getDatabase(databaseName);
|
||||||
|
if (database != null && databaseName.equals(database.getName())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (TException e) {
|
||||||
|
throw new HoodieHiveSyncException("Failed to check if database exists " + databaseName, e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a update in hive metastore with this SQL.
|
* Execute a update in hive metastore with this SQL.
|
||||||
*
|
*
|
||||||
|
|||||||
Reference in New Issue
Block a user