1
0

[HUDI-2883] Refactor hive sync tool / config to use reflection and standardize configs (#4175)

- Refactor hive sync tool / config to use reflection and standardize configs

Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Rajesh Mahindra
2022-03-21 19:56:31 -07:00
committed by GitHub
parent 9b6e138af2
commit 5f570ea151
43 changed files with 1521 additions and 1217 deletions

View File

@@ -18,27 +18,16 @@
package org.apache.hudi.hive;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Configs needed to sync data into Hive.
* Configs needed to sync data into the Hive Metastore.
*/
public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--database"}, description = "name of the target database in Hive", required = true)
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in Hive", required = true)
public String tableName;
@Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
public String baseFileFormat = "PARQUET";
public class HiveSyncConfig extends HoodieSyncConfig {
@Parameter(names = {"--user"}, description = "Hive username")
public String hiveUser;
@@ -52,48 +41,31 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
public String metastoreUris;
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
public String basePath;
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ "to extract the partition values from HDFS path")
public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--use-pre-apache-input-format"},
description = "Use InputFormat under com.uber.hoodie package "
+ "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat = false;
public Boolean usePreApacheInputFormat;
@Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
public String bucketSpec;
@Deprecated
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc = true;
public Boolean useJdbc;
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql")
public String syncMode;
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
public Boolean autoCreateDatabase = true;
public Boolean autoCreateDatabase;
@Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
public Boolean ignoreExceptions = false;
public Boolean ignoreExceptions;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
public Boolean skipROSuffix;
@Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
public String tableProperties;
@@ -106,64 +78,170 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+ "Disabled by default for backward compatibility.")
public Boolean supportTimestamp = false;
@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
public Boolean decodePartition = false;
public Boolean supportTimestamp;
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
public Boolean createManagedTable = false;
public Boolean createManagedTable;
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
public Integer batchSyncNum = 1000;
public Integer batchSyncNum;
@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
public Boolean syncAsSparkDataSourceTable = true;
public Boolean syncAsSparkDataSourceTable;
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
public int sparkSchemaLengthThreshold = 4000;
public int sparkSchemaLengthThreshold;
@Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
public Boolean withOperationField = false;
@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
public Boolean isConditionalSync = false;
@Parameter(names = {"--spark-version"}, description = "The spark version", required = false)
public String sparkVersion;
@Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
public boolean syncComment = false;
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;
newConfig.hivePass = cfg.hivePass;
newConfig.hiveUser = cfg.hiveUser;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.metastoreUris = cfg.metastoreUris;
newConfig.tableName = cfg.tableName;
newConfig.bucketSpec = cfg.bucketSpec;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties;
newConfig.createManagedTable = cfg.createManagedTable;
newConfig.batchSyncNum = cfg.batchSyncNum;
newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
newConfig.withOperationField = cfg.withOperationField;
newConfig.isConditionalSync = cfg.isConditionalSync;
newConfig.sparkVersion = cfg.sparkVersion;
newConfig.syncComment = cfg.syncComment;
return newConfig;
// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset
public static final ConfigProperty<String> HIVE_SYNC_ENABLED = ConfigProperty
.key("hoodie.datasource.hive_sync.enable")
.defaultValue("false")
.withDocumentation("When set to true, register/sync the table to Apache Hive metastore.");
public static final ConfigProperty<String> HIVE_USER = ConfigProperty
.key("hoodie.datasource.hive_sync.username")
.defaultValue("hive")
.withDocumentation("hive user name to use");
public static final ConfigProperty<String> HIVE_PASS = ConfigProperty
.key("hoodie.datasource.hive_sync.password")
.defaultValue("hive")
.withDocumentation("hive password to use");
public static final ConfigProperty<String> HIVE_URL = ConfigProperty
.key("hoodie.datasource.hive_sync.jdbcurl")
.defaultValue("jdbc:hive2://localhost:10000")
.withDocumentation("Hive metastore url");
public static final ConfigProperty<String> HIVE_USE_PRE_APACHE_INPUT_FORMAT = ConfigProperty
.key("hoodie.datasource.hive_sync.use_pre_apache_input_format")
.defaultValue("false")
.withDocumentation("Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. "
+ "Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");
/**
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
*/
@Deprecated
public static final ConfigProperty<String> HIVE_USE_JDBC = ConfigProperty
.key("hoodie.datasource.hive_sync.use_jdbc")
.defaultValue("true")
.deprecatedAfter("0.9.0")
.withDocumentation("Use JDBC when hive synchronization is enabled");
public static final ConfigProperty<String> METASTORE_URIS = ConfigProperty
.key("hoodie.datasource.hive_sync.metastore.uris")
.defaultValue("thrift://localhost:9083")
.withDocumentation("Hive metastore url");
public static final ConfigProperty<String> HIVE_AUTO_CREATE_DATABASE = ConfigProperty
.key("hoodie.datasource.hive_sync.auto_create_database")
.defaultValue("true")
.withDocumentation("Auto create hive database if does not exists");
public static final ConfigProperty<String> HIVE_IGNORE_EXCEPTIONS = ConfigProperty
.key("hoodie.datasource.hive_sync.ignore_exceptions")
.defaultValue("false")
.withDocumentation("Ignore exceptions when syncing with Hive.");
public static final ConfigProperty<String> HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = ConfigProperty
.key("hoodie.datasource.hive_sync.skip_ro_suffix")
.defaultValue("false")
.withDocumentation("Skip the _ro suffix for Read optimized table, when registering");
public static final ConfigProperty<String> HIVE_SUPPORT_TIMESTAMP_TYPE = ConfigProperty
.key("hoodie.datasource.hive_sync.support_timestamp")
.defaultValue("false")
.withDocumentation("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type. "
+ "Disabled by default for backward compatibility.");
public static final ConfigProperty<String> HIVE_TABLE_PROPERTIES = ConfigProperty
.key("hoodie.datasource.hive_sync.table_properties")
.noDefaultValue()
.withDocumentation("Additional properties to store with table.");
public static final ConfigProperty<String> HIVE_TABLE_SERDE_PROPERTIES = ConfigProperty
.key("hoodie.datasource.hive_sync.serde_properties")
.noDefaultValue()
.withDocumentation("Serde properties to hive table.");
public static final ConfigProperty<String> HIVE_SYNC_AS_DATA_SOURCE_TABLE = ConfigProperty
.key("hoodie.datasource.hive_sync.sync_as_datasource")
.defaultValue("true")
.withDocumentation("");
public static final ConfigProperty<Integer> HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty
.key("hoodie.datasource.hive_sync.schema_string_length_thresh")
.defaultValue(4000)
.withDocumentation("");
// Create table as managed table
public static final ConfigProperty<Boolean> HIVE_CREATE_MANAGED_TABLE = ConfigProperty
.key("hoodie.datasource.hive_sync.create_managed_table")
.defaultValue(false)
.withDocumentation("Whether to sync the table as managed table.");
public static final ConfigProperty<Integer> HIVE_BATCH_SYNC_PARTITION_NUM = ConfigProperty
.key("hoodie.datasource.hive_sync.batch_num")
.defaultValue(1000)
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.noDefaultValue()
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = ConfigProperty
.key("hoodie.datasource.hive_sync.bucket_sync")
.defaultValue(false)
.withDocumentation("Whether sync hive metastore bucket specification when using bucket index."
+ "The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'");
public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC = ConfigProperty
.key("hoodie.datasource.hive_sync.bucket_sync_spec")
.defaultValue("")
.withDocumentation("The hive metastore bucket specification when using bucket index."
+ "The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'");
public static final ConfigProperty<String> HIVE_SYNC_COMMENT = ConfigProperty
.key("hoodie.datasource.hive_sync.sync_comment")
.defaultValue("false")
.withDocumentation("Whether to sync the table column comments while syncing the table.");
public HiveSyncConfig() {
this(new TypedProperties());
}
public HiveSyncConfig(TypedProperties props) {
super(props);
this.hiveUser = getStringOrDefault(HIVE_USER);
this.hivePass = getStringOrDefault(HIVE_PASS);
this.jdbcUrl = getStringOrDefault(HIVE_URL);
this.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
this.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
this.metastoreUris = getStringOrDefault(METASTORE_URIS);
this.syncMode = getString(HIVE_SYNC_MODE);
this.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
this.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
this.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
this.tableProperties = getString(HIVE_TABLE_PROPERTIES);
this.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
this.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
this.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
this.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
this.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
this.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
this.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
this.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
}
@Override
@@ -197,6 +275,7 @@ public class HiveSyncConfig implements Serializable {
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ ", withOperationField=" + withOperationField
+ ", isConditionalSync=" + isConditionalSync
+ ", sparkVersion=" + sparkVersion
+ ", syncComment=" + syncComment
+ '}';
}

View File

@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
@@ -70,40 +72,53 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
protected final HiveSyncConfig cfg;
protected final HiveSyncConfig hiveSyncConfig;
protected HoodieHiveClient hoodieHiveClient = null;
protected String snapshotTableName = null;
protected Option<String> roTableName = null;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
public HiveSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, conf, fs);
this.hiveSyncConfig = new HiveSyncConfig(props);
init(hiveSyncConfig, new HiveConf(conf, HiveConf.class));
}
@Deprecated
public HiveSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig.getProps(), hiveConf, fs);
this.hiveSyncConfig = hiveSyncConfig;
init(hiveSyncConfig, hiveConf);
}
private void init(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
try {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
}
this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs);
} catch (RuntimeException e) {
if (cfg.ignoreExceptions) {
if (hiveSyncConfig.ignoreExceptions) {
LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
} else {
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
}
}
this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
cfg.partitionFields = new ArrayList<>();
hiveSyncConfig.partitionFields = new ArrayList<>();
}
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.snapshotTableName = hiveSyncConfig.tableName;
this.roTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
this.snapshotTableName = hiveSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableName = hiveSyncConfig.skipROSuffix ? Option.of(hiveSyncConfig.tableName) :
Option.of(hiveSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -116,10 +131,13 @@ public class HiveSyncTool extends AbstractSyncTool {
public void syncHoodieTable() {
try {
if (hoodieHiveClient != null) {
LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl + ", basePath :" + hiveSyncConfig.basePath);
doSync();
}
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re);
} finally {
if (hoodieHiveClient != null) {
hoodieHiveClient.close();
@@ -150,18 +168,19 @@ public class HiveSyncTool extends AbstractSyncTool {
+ " of type " + hoodieHiveClient.getTableType());
// check if the database exists else create it
if (cfg.autoCreateDatabase) {
if (hiveSyncConfig.autoCreateDatabase) {
try {
if (!hoodieHiveClient.doesDataBaseExist(cfg.databaseName)) {
hoodieHiveClient.createDatabase(cfg.databaseName);
if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) {
hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName);
}
} catch (Exception e) {
// this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
LOG.warn("Unable to create database", e);
}
} else {
if (!hoodieHiveClient.doesDataBaseExist(cfg.databaseName)) {
throw new HoodieHiveSyncException("hive database does not exist " + cfg.databaseName);
if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) {
LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName);
throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName);
}
}
@@ -181,7 +200,7 @@ public class HiveSyncTool extends AbstractSyncTool {
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
hiveSyncConfig.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
@@ -200,7 +219,7 @@ public class HiveSyncTool extends AbstractSyncTool {
// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
if (!hiveSyncConfig.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for " + tableName);
@@ -216,10 +235,10 @@ public class HiveSyncTool extends AbstractSyncTool {
private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) {
// Append spark table properties & serde properties
Map<String, String> tableProperties = ConfigUtils.toMap(cfg.tableProperties);
Map<String, String> serdeProperties = ConfigUtils.toMap(cfg.serdeProperties);
if (cfg.syncAsSparkDataSourceTable) {
Map<String, String> sparkTableProperties = getSparkTableProperties(cfg.sparkSchemaLengthThreshold, schema);
Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
if (hiveSyncConfig.syncAsSparkDataSourceTable) {
Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.sparkSchemaLengthThreshold, schema);
Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
@@ -228,10 +247,10 @@ public class HiveSyncTool extends AbstractSyncTool {
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(cfg.baseFileFormat.toUpperCase());
HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.baseFileFormat.toUpperCase());
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && cfg.usePreApacheInputFormat) {
if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.usePreApacheInputFormat) {
// Parquet input format had an InputFormat class visible under the old naming scheme.
inputFormatClassName = useRealTimeInputFormat
? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
@@ -250,12 +269,12 @@ public class HiveSyncTool extends AbstractSyncTool {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.partitionFields, hiveSyncConfig.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
// Sync the table properties if the schema has changed
if (cfg.tableProperties != null || cfg.syncAsSparkDataSourceTable) {
if (hiveSyncConfig.tableProperties != null || hiveSyncConfig.syncAsSparkDataSourceTable) {
hoodieHiveClient.updateTableProperties(tableName, tableProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
}
@@ -265,7 +284,7 @@ public class HiveSyncTool extends AbstractSyncTool {
}
}
if (cfg.syncComment) {
if (hiveSyncConfig.syncComment) {
Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields()
.stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
@@ -290,7 +309,7 @@ public class HiveSyncTool extends AbstractSyncTool {
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
GroupType originGroupType = schema.asGroupType();
List<String> partitionNames = cfg.partitionFields;
List<String> partitionNames = hiveSyncConfig.partitionFields;
List<Type> partitionCols = new ArrayList<>();
List<Type> dataCols = new ArrayList<>();
Map<String, Type> column2Field = new HashMap<>();
@@ -319,8 +338,8 @@ public class HiveSyncTool extends AbstractSyncTool {
Map<String, String> sparkProperties = new HashMap<>();
sparkProperties.put("spark.sql.sources.provider", "hudi");
if (!StringUtils.isNullOrEmpty(cfg.sparkVersion)) {
sparkProperties.put("spark.sql.create.version", cfg.sparkVersion);
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.sparkVersion)) {
sparkProperties.put("spark.sql.create.version", hiveSyncConfig.sparkVersion);
}
// Split the schema string to multi-parts according the schemaLengthThreshold size.
String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
@@ -344,7 +363,7 @@ public class HiveSyncTool extends AbstractSyncTool {
private Map<String, String> getSparkSerdeProperties(boolean readAsOptimized) {
Map<String, String> sparkSerdeProperties = new HashMap<>();
sparkSerdeProperties.put("path", cfg.basePath);
sparkSerdeProperties.put("path", hiveSyncConfig.basePath);
sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
return sparkSerdeProperties;
}

View File

@@ -18,15 +18,24 @@
package org.apache.hudi.hive.replication;
import com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import com.beust.jcommander.Parameter;
public class GlobalHiveSyncConfig extends HiveSyncConfig {
@Parameter(names = {"--replicated-timestamp"}, description = "Add globally replicated timestamp to enable consistent reads across clusters")
public String globallyReplicatedTimeStamp;
public GlobalHiveSyncConfig() {
}
public GlobalHiveSyncConfig(TypedProperties props) {
super(props);
}
public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig();
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(cfg.getProps());
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;

View File

@@ -48,9 +48,9 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
@Override
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {
super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
if (((GlobalHiveSyncConfig)cfg).globallyReplicatedTimeStamp != null) {
if (((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp != null) {
hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
((GlobalHiveSyncConfig) cfg).globallyReplicatedTimeStamp);
((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp);
}
LOG.info("Sync complete for " + tableName);
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -75,7 +76,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -89,16 +89,21 @@ import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("SameParameterValue")
public class HiveTestUtil {
public static final String DB_NAME = "testdb";
public static final String TABLE_NAME = "test1";
public static String basePath;
public static TypedProperties hiveSyncProps;
public static HiveTestService hiveTestService;
public static FileSystem fileSystem;
public static QueryBasedDDLExecutor ddlExecutor;
private static ZooKeeperServer zkServer;
private static HiveServer2 hiveServer;
public static HiveTestService hiveTestService;
private static ZookeeperTestService zkService;
private static Configuration configuration;
public static HiveSyncConfig hiveSyncConfig;
private static HiveSyncConfig hiveSyncConfig;
private static DateTimeFormatter dtfOut;
public static FileSystem fileSystem;
private static Set<String> createdTablesSet = new HashSet<>();
public static QueryBasedDDLExecutor ddlExecutor;
public static void setUp() throws IOException, InterruptedException, HiveException, MetaException {
configuration = new Configuration();
@@ -112,16 +117,21 @@ public class HiveTestUtil {
}
fileSystem = FileSystem.get(configuration);
hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = hiveTestService.getJdbcHive2Url();
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb";
hiveSyncConfig.tableName = "test1";
hiveSyncConfig.basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
hiveSyncConfig.assumeDatePartitioning = true;
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
hiveSyncProps = new TypedProperties();
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_URL.key(), hiveTestService.getJdbcHive2Url());
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_USER.key(), "");
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_PASS.key(), "");
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), DB_NAME);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), TABLE_NAME);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_BASE_PATH.key(), basePath);
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
hiveSyncProps.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");
hiveSyncConfig = new HiveSyncConfig(hiveSyncProps);
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf());
@@ -138,18 +148,18 @@ public class HiveTestUtil {
}
public static void clear() throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
fileSystem.delete(new Path(basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.initTable(configuration, basePath);
for (String tableName : createdTablesSet) {
ddlExecutor.runSQL("drop table if exists " + tableName);
}
createdTablesSet.clear();
ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName + " cascade");
ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade");
}
public static HiveConf getHiveConf() {
@@ -189,7 +199,7 @@ public class HiveTestUtil {
public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, hiveSyncConfig.basePath, hiveSyncConfig.databaseName, hiveSyncConfig.tableName);
createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, basePath, DB_NAME, TABLE_NAME);
}
public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type, boolean isParquetSchemaSimple, boolean useSchemaFromCommitMetadata)
@@ -205,13 +215,13 @@ public class HiveTestUtil {
public static void createCOWTableWithSchema(String instantTime, String schemaFileName)
throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
Path path = new Path(basePath);
FileIOUtils.deleteDirectory(new File(basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.initTable(configuration, basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
@@ -219,7 +229,7 @@ public class HiveTestUtil {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
String partitionPath = dateTime.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
Path partPath = new Path(basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = new ArrayList<>();
@@ -233,30 +243,30 @@ public class HiveTestUtil {
writeStats.add(writeStat);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString());
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(DB_NAME + "." + TABLE_NAME);
createCommitFile(commitMetadata, instantTime, basePath);
}
public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
Path path = new Path(basePath);
FileIOUtils.deleteDirectory(new File(basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(hiveSyncConfig.tableName)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.initTable(configuration, basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime, hiveSyncConfig.basePath);
useSchemaFromCommitMetadata, dateTime, commitTime, basePath);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
.add(DB_NAME + "." + TABLE_NAME + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
.add(DB_NAME + "." + TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
@@ -274,26 +284,26 @@ public class HiveTestUtil {
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime, basePath);
createdTablesSet.add(DB_NAME + "." + TABLE_NAME);
createCommitFile(commitMetadata, instantTime, basePath);
}
public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(DB_NAME + "." + TABLE_NAME);
createCommitFile(commitMetadata, instantTime, basePath);
}
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
useSchemaFromCommitMetadata, startFrom, instantTime, basePath);
createdTablesSet.add(DB_NAME + "." + TABLE_NAME + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(DB_NAME + "." + TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
@@ -346,7 +356,7 @@ public class HiveTestUtil {
private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
Path partPath = new Path(basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
@@ -471,7 +481,7 @@ public class HiveTestUtil {
public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeReplaceFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
@@ -480,13 +490,13 @@ public class HiveTestUtil {
public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
createCommitFile(commitMetadata, instantTime, basePath);
}
private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime)
throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
@@ -496,7 +506,7 @@ public class HiveTestUtil {
private static void createDeltaCommitFile(HoodieCommitMetadata deltaCommitMetadata, String deltaCommitTime)
throws IOException {
byte[] bytes = deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeDeltaFileName(deltaCommitTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);