1
0

[HUDI-3985] Refactor DLASyncTool to support read hoodie table as spark datasource table (#5532)

This commit is contained in:
huberylee
2022-05-20 22:25:32 +08:00
committed by GitHub
parent c7576f7613
commit 85b146d3d5
26 changed files with 1281 additions and 974 deletions

View File

@@ -26,11 +26,11 @@ import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}

View File

@@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._

View File

@@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils

View File

@@ -20,8 +20,8 @@ package org.apache.spark.sql.hudi.catalog
import org.apache.hadoop.fs.Path
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier

View File

@@ -25,7 +25,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-dla-sync</artifactId>
<artifactId>hudi-adb-sync</artifactId>
<packaging>jar</packaging>
<properties>

View File

@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.adb;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClient {
protected AdbSyncConfig adbSyncConfig;
protected PartitionValueExtractor partitionValueExtractor;
protected HoodieTimeline activeTimeline;
public AbstractAdbSyncHoodieClient(AdbSyncConfig syncConfig, FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning,
syncConfig.useFileListingFromMetadata, false, fs);
this.adbSyncConfig = syncConfig;
final String clazz = adbSyncConfig.partitionValueExtractorClass;
try {
this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(clazz).newInstance();
} catch (Exception e) {
throw new HoodieException("Fail to init PartitionValueExtractor class " + clazz, e);
}
activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> tablePartitions,
List<String> partitionStoragePartitions) {
Map<String, String> paths = new HashMap<>();
for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) {
List<String> partitionValues = entry.getKey();
String fullTablePartitionPath = entry.getValue();
paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (adbSyncConfig.useHiveStylePartitioning) {
String partition = String.join("/", storagePartitionValues);
storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
}
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
return events;
}
public void close() {
}
public abstract Map<List<String>, String> scanTablePartitions(String tableName) throws Exception;
public abstract void updateTableDefinition(String tableName, SchemaDifference schemaDiff) throws Exception;
public abstract boolean databaseExists(String databaseName) throws Exception;
public abstract void createDatabase(String databaseName) throws Exception;
public abstract void dropTable(String tableName);
protected String getDatabasePath() {
String dbLocation = adbSyncConfig.dbLocation;
Path dbLocationPath;
if (StringUtils.isNullOrEmpty(dbLocation)) {
if (new Path(adbSyncConfig.basePath).isRoot()) {
dbLocationPath = new Path(adbSyncConfig.basePath);
} else {
dbLocationPath = new Path(adbSyncConfig.basePath).getParent();
}
} else {
dbLocationPath = new Path(dbLocation);
}
return generateAbsolutePathStr(dbLocationPath);
}
protected String generateAbsolutePathStr(Path path) {
String absolutePathStr = path.toString();
if (path.toUri().getScheme() == null) {
absolutePathStr = getDefaultFs() + absolutePathStr;
}
return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
}
protected String getDefaultFs() {
return fs.getConf().get("fs.defaultFS");
}
}

View File

@@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.adb;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
/**
* Configs needed to sync data into Alibaba Cloud AnalyticDB(ADB).
*/
public class AdbSyncConfig extends HoodieSyncConfig {
@Parameter(names = {"--user"}, description = "Adb username", required = true)
public String adbUser;
@Parameter(names = {"--pass"}, description = "Adb password", required = true)
public String adbPass;
@Parameter(names = {"--jdbc-url"}, description = "Adb jdbc connect url", required = true)
public String jdbcUrl;
@Parameter(names = {"--skip-ro-suffix"}, description = "Whether skip the `_ro` suffix for read optimized table when syncing")
public Boolean skipROSuffix;
@Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
public Boolean skipRTSync;
@Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
public Boolean useHiveStylePartitioning;
@Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
public Boolean supportTimestamp;
@Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
public Boolean syncAsSparkDataSourceTable;
@Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
public String tableProperties;
@Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
public String serdeProperties;
@Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
public int sparkSchemaLengthThreshold;
@Parameter(names = {"--db-location"}, description = "Database location")
public String dbLocation;
@Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
public Boolean autoCreateDatabase = true;
@Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
public Boolean skipLastCommitTimeSync = false;
@Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
public Boolean dropTableBeforeCreation = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
public static final ConfigProperty<String> ADB_SYNC_USER = ConfigProperty
.key("hoodie.datasource.adb.sync.username")
.noDefaultValue()
.withDocumentation("ADB username");
public static final ConfigProperty<String> ADB_SYNC_PASS = ConfigProperty
.key("hoodie.datasource.adb.sync.password")
.noDefaultValue()
.withDocumentation("ADB user password");
public static final ConfigProperty<String> ADB_SYNC_JDBC_URL = ConfigProperty
.key("hoodie.datasource.adb.sync.jdbc_url")
.noDefaultValue()
.withDocumentation("Adb jdbc connect url");
public static final ConfigProperty<Boolean> ADB_SYNC_SKIP_RO_SUFFIX = ConfigProperty
.key("hoodie.datasource.adb.sync.skip_ro_suffix")
.defaultValue(true)
.withDocumentation("Whether skip the `_ro` suffix for read optimized table when syncing");
public static final ConfigProperty<Boolean> ADB_SYNC_SKIP_RT_SYNC = ConfigProperty
.key("hoodie.datasource.adb.sync.skip_rt_sync")
.defaultValue(true)
.withDocumentation("Whether skip the rt table when syncing");
public static final ConfigProperty<Boolean> ADB_SYNC_USE_HIVE_STYLE_PARTITIONING = ConfigProperty
.key("hoodie.datasource.adb.sync.hive_style_partitioning")
.defaultValue(false)
.withDocumentation("Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2");
public static final ConfigProperty<Boolean> ADB_SYNC_SUPPORT_TIMESTAMP = ConfigProperty
.key("hoodie.datasource.adb.sync.support_timestamp")
.defaultValue(false)
.withDocumentation("If true, converts int64(timestamp_micros) to timestamp type");
public static final ConfigProperty<Boolean> ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE = ConfigProperty
.key("hoodie.datasource.adb.sync.sync_as_spark_datasource")
.defaultValue(true)
.withDocumentation("Whether sync this table as spark data source table");
public static final ConfigProperty<String> ADB_SYNC_TABLE_PROPERTIES = ConfigProperty
.key("hoodie.datasource.adb.sync.table_properties")
.noDefaultValue()
.withDocumentation("Table properties, to support read hoodie table as datasource table");
public static final ConfigProperty<String> ADB_SYNC_SERDE_PROPERTIES = ConfigProperty
.key("hoodie.datasource.adb.sync.serde_properties")
.noDefaultValue()
.withDocumentation("Serde properties, to support read hoodie table as datasource table");
public static final ConfigProperty<Integer> ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty
.key("hoodie.datasource.adb.sync.schema_string_length_threshold")
.defaultValue(4000)
.withDocumentation("The maximum length allowed in a single cell when storing additional schema information in Hive's metastore");
public static final ConfigProperty<String> ADB_SYNC_DB_LOCATION = ConfigProperty
.key("hoodie.datasource.adb.sync.db_location")
.noDefaultValue()
.withDocumentation("Database location");
public static final ConfigProperty<Boolean> ADB_SYNC_AUTO_CREATE_DATABASE = ConfigProperty
.key("hoodie.datasource.adb.sync.auto_create_database")
.defaultValue(true)
.withDocumentation("Whether auto create adb database");
public static final ConfigProperty<Boolean> ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC = ConfigProperty
.key("hoodie.datasource.adb.sync.skip_last_commit_time_sync")
.defaultValue(false)
.withDocumentation("Whether skip last commit time syncing");
public static final ConfigProperty<Boolean> ADB_SYNC_DROP_TABLE_BEFORE_CREATION = ConfigProperty
.key("hoodie.datasource.adb.sync.drop_table_before_creation")
.defaultValue(false)
.withDocumentation("Whether drop table before creation");
public AdbSyncConfig() {
this(new TypedProperties());
}
public AdbSyncConfig(TypedProperties props) {
super(props);
adbUser = getString(ADB_SYNC_USER);
adbPass = getString(ADB_SYNC_PASS);
jdbcUrl = getString(ADB_SYNC_JDBC_URL);
skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
dbLocation = getString(ADB_SYNC_DB_LOCATION);
autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
}
public static TypedProperties toProps(AdbSyncConfig cfg) {
TypedProperties properties = new TypedProperties();
properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName);
properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName);
properties.put(ADB_SYNC_USER.key(), cfg.adbUser);
properties.put(ADB_SYNC_PASS.key(), cfg.adbPass);
properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl);
properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields));
properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass);
properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning));
properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix));
properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync));
properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning));
properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata));
properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp));
properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties);
properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties);
properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable));
properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold));
properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion);
properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation);
properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase));
properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync));
properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation));
return properties;
}
@Override
public String toString() {
return "AdbSyncConfig{"
+ "adbUser='" + adbUser + '\''
+ ", adbPass='" + adbPass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", skipROSuffix=" + skipROSuffix
+ ", skipRTSync=" + skipRTSync
+ ", useHiveStylePartitioning=" + useHiveStylePartitioning
+ ", supportTimestamp=" + supportTimestamp
+ ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
+ ", tableProperties='" + tableProperties + '\''
+ ", serdeProperties='" + serdeProperties + '\''
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ ", dbLocation='" + dbLocation + '\''
+ ", autoCreateDatabase=" + autoCreateDatabase
+ ", skipLastCommitTimeSync=" + skipLastCommitTimeSync
+ ", dropTableBeforeCreation=" + dropTableBeforeCreation
+ ", help=" + help
+ ", databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", basePath='" + basePath + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", decodePartition=" + decodePartition
+ ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ ", isConditionalSync=" + isConditionalSync
+ ", sparkVersion='" + sparkVersion + '\''
+ '}';
}
}

View File

@@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.adb;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.util.ConfigUtils;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Adb sync tool is mainly used to sync hoodie tables to Alibaba Cloud AnalyticDB(ADB),
* it can be used as API `AdbSyncTool.syncHoodieTable(AdbSyncConfig)` or as command
* line `java -cp hoodie-hive.jar AdbSyncTool [args]`
*
* <p>
* This utility will get the schema from the latest commit and will sync ADB table schema,
* incremental partitions will be synced as well.
*/
@SuppressWarnings("WeakerAccess")
public class AdbSyncTool extends AbstractSyncTool {
private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final AdbSyncConfig adbSyncConfig;
private final AbstractAdbSyncHoodieClient hoodieAdbClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
public AdbSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, conf, fs);
this.adbSyncConfig = new AdbSyncConfig(props);
this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs);
switch (hoodieAdbClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = adbSyncConfig.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName)
: Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
+ ", basePath:" + hoodieAdbClient.getBasePath());
}
}
private AbstractAdbSyncHoodieClient getHoodieAdbClient(AdbSyncConfig adbSyncConfig, FileSystem fs) {
return new HoodieAdbJdbcClient(adbSyncConfig, fs);
}
@Override
public void syncHoodieTable() {
try {
switch (hoodieAdbClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false, false);
break;
case MERGE_ON_READ:
// Sync a ro table for MOR table
syncHoodieTable(roTableTableName.get(), false, true);
// Sync a rt table for MOR table
if (!adbSyncConfig.skipRTSync) {
syncHoodieTable(snapshotTableName, true, false);
}
break;
default:
throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
+ ", basePath:" + hoodieAdbClient.getBasePath());
}
} catch (Exception re) {
throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re);
} finally {
hoodieAdbClient.close();
}
}
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean readAsOptimized) throws Exception {
LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType());
if (adbSyncConfig.autoCreateDatabase) {
try {
synchronized (AdbSyncTool.class) {
if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
hoodieAdbClient.createDatabase(adbSyncConfig.databaseName);
}
}
} catch (Exception e) {
throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName
+ ", useRealtimeInputFormat = " + useRealtimeInputFormat, e);
}
} else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName);
}
// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071],
// we can remove this logical.
if (hoodieAdbClient.isBootstrap()
&& hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
adbSyncConfig.syncAsSparkDataSourceTable = false;
LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName);
}
if (adbSyncConfig.dropTableBeforeCreation) {
LOG.info("Drop table before creation, tableName:{}", tableName);
hoodieAdbClient.dropTable(tableName);
}
boolean tableExists = hoodieAdbClient.tableExists(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieAdbClient.getDataSchema();
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
LOG.info("Sync schema complete, start syncing partitions for table:{}", tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
if (tableExists) {
lastCommitTimeSynced = hoodieAdbClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found:{}", lastCommitTimeSynced.orElse("null"));
// Scan synced partitions
List<String> writtenPartitionsSince;
if (adbSyncConfig.partitionFields.isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
}
LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
// Update sync commit time
// whether to skip syncing commit time stored in tbl properties, since it is time consuming.
if (!adbSyncConfig.skipLastCommitTimeSync) {
hoodieAdbClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for table:{}", tableName);
}
/**
* Get the latest schema from the last commit and check if its in sync with the ADB
* table schema. If not, evolves the table schema.
*
* @param tableName The table to be synced
* @param tableExists Whether target table exists
* @param useRealTimeInputFormat Whether using realtime input format
* @param readAsOptimized Whether read as optimized table
* @param schema The extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) throws Exception {
// Append spark table properties & serde properties
Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties);
Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties);
if (adbSyncConfig.syncAsSparkDataSourceTable) {
Map<String, String> sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields,
adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema);
Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}",
tableName, tableExists, tableProperties, serdeProperties);
}
// Check and sync schema
if (!tableExists) {
LOG.info("ADB table [{}] is not found, creating it", tableName);
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat);
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
// /ql/exec/DDLTask.java#L3488
hoodieAdbClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName(), serdeProperties, tableProperties);
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieAdbClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields,
adbSyncConfig.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for table:{}", tableName);
hoodieAdbClient.updateTableDefinition(tableName, schemaDiff);
} else {
LOG.info("No Schema difference for table:{}", tableName);
}
}
}
/**
* Syncs the list of storage partitions passed in (checks if the partition is in adb, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
if (adbSyncConfig.partitionFields.isEmpty()) {
LOG.info("Not a partitioned table.");
return;
}
Map<List<String>, String> partitions = hoodieAdbClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents = hoodieAdbClient.getPartitionEvents(partitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
LOG.info("New Partitions:{}", newPartitions);
hoodieAdbClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions:{}", updatePartitions);
hoodieAdbClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieAdbSyncException("Failed to sync partitions for table:" + tableName, e);
}
}
private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) {
return events.stream().filter(s -> s.eventType == eventType)
.map(s -> s.storagePartition).collect(Collectors.toList());
}
public static void main(String[] args) {
// parse the params
final AdbSyncConfig cfg = new AdbSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
Configuration hadoopConf = new Configuration();
FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable();
}
}

View File

@@ -0,0 +1,440 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.adb;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
private static final Logger LOG = LoggerFactory.getLogger(HoodieAdbJdbcClient.class);
public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
// Make sure we have the jdbc driver in classpath
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
public static final String ADB_ESCAPE_CHARACTER = "";
private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
static {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
}
}
private Connection connection;
public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) {
super(syncConfig, fs);
createAdbConnection();
LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl);
}
private void createAdbConnection() {
if (connection == null) {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
LOG.error("Unable to load jdbc driver class", e);
return;
}
try {
this.connection = DriverManager.getConnection(
adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass);
} catch (SQLException e) {
throw new HoodieException("Cannot create adb connection ", e);
}
}
}
@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties) {
try {
LOG.info("Creating table:{}", tableName);
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema,
getHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
executeAdbSql(createSQLQuery);
} catch (IOException e) {
throw new HoodieException("Fail to create table:" + tableName, e);
}
}
@Override
public void dropTable(String tableName) {
LOG.info("Dropping table:{}", tableName);
String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`";
executeAdbSql(dropTable);
}
public Map<String, String> getTableSchema(String tableName) {
Map<String, String> schema = new HashMap<>();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData.getColumns(adbSyncConfig.databaseName,
adbSyncConfig.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
if ("DECIMAL".equals(columnType)) {
int columnSize = result.getInt("COLUMN_SIZE");
int decimalDigits = result.getInt("DECIMAL_DIGITS");
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
}
schema.put(columnName, columnType);
}
return schema;
} catch (SQLException e) {
throw new HoodieException("Fail to get table schema:" + tableName, e);
} finally {
closeQuietly(result, null);
}
}
@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for table:{}", tableName);
return;
}
LOG.info("Adding partitions to table:{}, partitionNum:{}", tableName, partitionsToAdd.size());
String sql = constructAddPartitionsSql(tableName, partitionsToAdd);
executeAdbSql(sql);
}
private void executeAdbSql(String sql) {
Statement stmt = null;
try {
stmt = connection.createStatement();
LOG.info("Executing sql:{}", sql);
stmt.execute(sql);
} catch (SQLException e) {
throw new HoodieException("Fail to execute sql:" + sql, e);
} finally {
closeQuietly(null, stmt);
}
}
private <T> T executeQuerySQL(String sql, Function<ResultSet, T> function) {
Statement stmt = null;
try {
stmt = connection.createStatement();
LOG.info("Executing sql:{}", sql);
return function.apply(stmt.executeQuery(sql));
} catch (SQLException e) {
throw new HoodieException("Fail to execute sql:" + sql, e);
} finally {
closeQuietly(null, stmt);
}
}
public void createDatabase(String databaseName) {
String rootPath = getDatabasePath();
LOG.info("Creating database:{}, databaseLocation:{}", databaseName, rootPath);
String sql = constructCreateDatabaseSql(rootPath);
executeAdbSql(sql);
}
public boolean databaseExists(String databaseName) {
String sql = constructShowCreateDatabaseSql(databaseName);
Function<ResultSet, Boolean> transform = resultSet -> {
try {
return resultSet.next();
} catch (Exception e) {
if (e.getMessage().contains("Unknown database `" + databaseName + "`")) {
return false;
} else {
throw new HoodieException("Fail to execute sql:" + sql, e);
}
}
};
return executeQuerySQL(sql, transform);
}
@Override
public boolean doesTableExist(String tableName) {
String sql = constructShowLikeTableSql(tableName);
Function<ResultSet, Boolean> transform = resultSet -> {
try {
return resultSet.next();
} catch (Exception e) {
throw new HoodieException("Fail to execute sql:" + sql, e);
}
};
return executeQuerySQL(sql, transform);
}
@Override
public boolean tableExists(String tableName) {
return doesTableExist(tableName);
}
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
String sql = constructShowCreateTableSql(tableName);
Function<ResultSet, Option<String>> transform = resultSet -> {
try {
if (resultSet.next()) {
String table = resultSet.getString(2);
Map<String, String> attr = new HashMap<>();
int index = table.indexOf(TBL_PROPERTIES_STR);
if (index != -1) {
String sub = table.substring(index + TBL_PROPERTIES_STR.length());
sub = sub
.replaceAll("\\(", "")
.replaceAll("\\)", "")
.replaceAll("'", "");
String[] str = sub.split(",");
for (String s : str) {
String key = s.split("=")[0].trim();
String value = s.split("=")[1].trim();
attr.put(key, value);
}
}
return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
}
return Option.empty();
} catch (Exception e) {
throw new HoodieException("Fail to execute sql:" + sql, e);
}
};
return executeQuerySQL(sql, transform);
}
@Override
public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLProperties
String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp();
try {
String sql = constructUpdateTblPropertiesSql(tableName, lastCommitSynced);
executeAdbSql(sql);
} catch (Exception e) {
throw new HoodieHiveSyncException("Fail to get update last commit time synced:" + lastCommitSynced, e);
}
}
@Override
public Option<String> getLastReplicatedTime(String tableName) {
throw new UnsupportedOperationException("Not support getLastReplicatedTime yet");
}
@Override
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet");
}
@Override
public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet");
}
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for table:{}", tableName);
return;
}
LOG.info("Changing partitions on table:{}, changedPartitionNum:{}", tableName, changedPartitions.size());
List<String> sqlList = constructChangePartitionsSql(tableName, changedPartitions);
for (String sql : sqlList) {
executeAdbSql(sql);
}
}
@Override
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
throw new UnsupportedOperationException("Not support dropPartitions yet.");
}
public Map<List<String>, String> scanTablePartitions(String tableName) {
String sql = constructShowPartitionSql(tableName);
Function<ResultSet, Map<List<String>, String>> transform = resultSet -> {
Map<List<String>, String> partitions = new HashMap<>();
try {
while (resultSet.next()) {
if (resultSet.getMetaData().getColumnCount() > 0) {
String str = resultSet.getString(1);
if (!StringUtils.isNullOrEmpty(str)) {
List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values));
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
partitions.put(values, fullStoragePartitionPath);
}
}
}
} catch (Exception e) {
throw new HoodieException("Fail to execute sql:" + sql, e);
}
return partitions;
};
return executeQuerySQL(sql, transform);
}
public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
LOG.info("Adding columns for table:{}", tableName);
schemaDiff.getAddColumnTypes().forEach((columnName, columnType) ->
executeAdbSql(constructAddColumnSql(tableName, columnName, columnType))
);
LOG.info("Updating columns' definition for table:{}", tableName);
schemaDiff.getUpdateColumnTypes().forEach((columnName, columnType) ->
executeAdbSql(constructChangeColumnSql(tableName, columnName, columnType))
);
}
private String constructAddPartitionsSql(String tableName, List<String> partitions) {
StringBuilder sqlBuilder = new StringBuilder("alter table `");
sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`")
.append(tableName).append("`").append(" add if not exists ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
sqlBuilder.append(" partition (").append(partitionClause).append(") location '")
.append(fullPartitionPathStr).append("' ");
}
return sqlBuilder.toString();
}
private List<String> constructChangePartitionsSql(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
String useDatabase = "use `" + adbSyncConfig.databaseName + "`";
changePartitions.add(useDatabase);
String alterTable = "alter table `" + tableName + "`";
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
String changePartition = alterTable + " add if not exists partition (" + partitionClause
+ ") location '" + fullPartitionPathStr + "'";
changePartitions.add(changePartition);
}
return changePartitions;
}
/**
* Generate Hive Partition from partition values.
*
* @param partition Partition path
* @return partition clause
*/
private String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(),
"Partition key parts " + adbSyncConfig.partitionFields
+ " does not match with partition values " + partitionValues + ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) {
partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
}
return String.join(",", partBuilder);
}
private String constructShowPartitionSql(String tableName) {
return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName);
}
private String constructShowCreateTableSql(String tableName) {
return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName);
}
private String constructShowLikeTableSql(String tableName) {
return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName);
}
private String constructCreateDatabaseSql(String rootPath) {
return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')",
adbSyncConfig.databaseName, rootPath);
}
private String constructShowCreateDatabaseSql(String databaseName) {
return String.format("show create database `%s`", databaseName);
}
private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
}
private String constructAddColumnSql(String tableName, String columnName, String columnType) {
return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
adbSyncConfig.databaseName, tableName, columnName, columnType);
}
private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
}
private HiveSyncConfig getHiveSyncConfig() {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
Path basePath = new Path(adbSyncConfig.basePath);
hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
return hiveSyncConfig;
}
@Override
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.error("Fail to close connection", e);
}
}
}

View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.adb;
public class HoodieAdbSyncException extends RuntimeException {
public HoodieAdbSyncException(String message) {
super(message);
}
public HoodieAdbSyncException(String message, Throwable t) {
super(message, t);
}
}

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.sync.adb;
import org.apache.hudi.common.config.TypedProperties;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestAdbSyncConfig {
@Test
public void testCopy() {
AdbSyncConfig adbSyncConfig = new AdbSyncConfig();
adbSyncConfig.partitionFields = Arrays.asList("a", "b");
adbSyncConfig.basePath = "/tmp";
adbSyncConfig.assumeDatePartitioning = true;
adbSyncConfig.databaseName = "test";
adbSyncConfig.tableName = "test";
adbSyncConfig.adbUser = "adb";
adbSyncConfig.adbPass = "adb";
adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
adbSyncConfig.skipROSuffix = false;
adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
+ "spark.sql.sources.schema.numParts = '1'\\n "
+ "spark.sql.sources.schema.part.0 ='xx'\\n "
+ "spark.sql.sources.schema.numPartCols = '1'\\n"
+ "spark.sql.sources.schema.partCol.0 = 'dt'";
adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'";
adbSyncConfig.dbLocation = "file://tmp/test_db";
TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig);
AdbSyncConfig copied = new AdbSyncConfig(props);
assertEquals(copied.partitionFields, adbSyncConfig.partitionFields);
assertEquals(copied.basePath, adbSyncConfig.basePath);
assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning);
assertEquals(copied.databaseName, adbSyncConfig.databaseName);
assertEquals(copied.tableName, adbSyncConfig.tableName);
assertEquals(copied.adbUser, adbSyncConfig.adbUser);
assertEquals(copied.adbPass, adbSyncConfig.adbPass);
assertEquals(copied.basePath, adbSyncConfig.basePath);
assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl);
assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix);
assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp);
}
}

View File

@@ -1,111 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Configs needed to sync data into DLA.
*/
public class DLASyncConfig implements Serializable {
@Parameter(names = {"--database"}, description = "name of the target database in DLA", required = true)
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in DLA", required = true)
public String tableName;
@Parameter(names = {"--user"}, description = "DLA username", required = true)
public String dlaUser;
@Parameter(names = {"--pass"}, description = "DLA password", required = true)
public String dlaPass;
@Parameter(names = {"--jdbc-url"}, description = "DLA jdbc connect url", required = true)
public String jdbcUrl;
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
public String basePath;
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ "to extract the partition values from HDFS path")
public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@Parameter(names = {"--skip-rt-sync"}, description = "Skip the RT table syncing")
public Boolean skipRTSync = false;
@Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2")
public Boolean useDLASyncHiveStylePartitioning = false;
@Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
public Boolean supportTimestamp = false;
public static DLASyncConfig copy(DLASyncConfig cfg) {
DLASyncConfig newConfig = new DLASyncConfig();
newConfig.databaseName = cfg.databaseName;
newConfig.tableName = cfg.tableName;
newConfig.dlaUser = cfg.dlaUser;
newConfig.dlaPass = cfg.dlaPass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.basePath = cfg.basePath;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.skipROSuffix = cfg.skipROSuffix;
newConfig.skipRTSync = cfg.skipRTSync;
newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
newConfig.supportTimestamp = cfg.supportTimestamp;
return newConfig;
}
@Override
public String toString() {
return "DLASyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", dlaUser='" + dlaUser + '\'' + ", dlaPass='" + dlaPass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning
+ ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ ", help=" + help + '}';
}
}

View File

@@ -1,213 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.dla.util.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Tool to sync a hoodie table with a dla table. Either use it as a api
* DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
* <p>
* This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
public class DLASyncTool extends AbstractSyncTool {
private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final DLASyncConfig cfg;
private final HoodieDLAClient hoodieDLAClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
public DLASyncTool(TypedProperties properties, Configuration conf, FileSystem fs) {
super(properties, conf, fs);
this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
this.cfg = Utils.propertiesToConfig(properties);
switch (hoodieDLAClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
throw new InvalidTableException(hoodieDLAClient.getBasePath());
}
}
@Override
public void syncHoodieTable() {
try {
switch (hoodieDLAClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableTableName.get(), false);
// sync a RT table for MOR
if (!cfg.skipRTSync) {
syncHoodieTable(snapshotTableName, true);
}
break;
default:
LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
throw new InvalidTableException(hoodieDLAClient.getBasePath());
}
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when dla syncing " + cfg.tableName, re);
} finally {
hoodieDLAClient.close();
}
}
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+ " of type " + hoodieDLAClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieDLAClient.tableExists(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieDLAClient.getDataSchema();
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
// TODO : once DLA supports alter table properties
Option<String> lastCommitTimeSynced = Option.empty();
/*if (tableExists) {
lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
}*/
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = hoodieDLAClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
hoodieDLAClient.updateLastCommitTimeSynced(tableName);
LOG.info("Sync complete for " + tableName);
}
/**
* Get the latest schema from the last commit and check if its in sync with the dla table schema. If not, evolves the
* table schema.
*
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
// Check and sync schema
if (!tableExists) {
LOG.info("DLA table " + tableName + " is not found. Creating it");
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat);
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
// /ql/exec/DDLTask.java#L3488
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieDLAClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);
} else {
LOG.info("No Schema difference for " + tableName);
}
}
}
/**
* Syncs the list of storage partitions passed in (checks if the partition is in dla, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
if (cfg.partitionFields.isEmpty()) {
LOG.info("not a partitioned table.");
return;
}
Map<List<String>, String> partitions = hoodieDLAClient.scanTablePartitions(tableName);
List<AbstractSyncHoodieClient.PartitionEvent> partitionEvents =
hoodieDLAClient.getPartitionEvents(partitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieDLAClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieDLAClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieException("Failed to sync partitions for table " + tableName, e);
}
}
private List<String> filterPartitions(List<AbstractSyncHoodieClient.PartitionEvent> events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) {
return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition)
.collect(Collectors.toList());
}
public static void main(String[] args) {
// parse the params
final DLASyncConfig cfg = new DLASyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
Configuration hadoopConf = new Configuration();
FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
new DLASyncTool(Utils.configToProperties(cfg), hadoopConf, fs).syncHoodieTable();
}
}

View File

@@ -1,428 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class HoodieDLAClient extends AbstractSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
// Make sure we have the dla JDBC driver in classpath
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
private static final String DLA_ESCAPE_CHARACTER = "";
private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
static {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
}
}
private Connection connection;
private DLASyncConfig dlaConfig;
private PartitionValueExtractor partitionValueExtractor;
public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
false, fs);
this.dlaConfig = syncConfig;
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieException(
"Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
}
createDLAConnection();
}
private void createDLAConnection() {
if (connection == null) {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
LOG.error("Unable to load DLA driver class", e);
return;
}
try {
this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl);
} catch (SQLException e) {
throw new HoodieException("Cannot create dla connection ", e);
}
}
}
@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties) {
try {
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(),
inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
LOG.info("Creating table with " + createSQLQuery);
updateDLASQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieException("Failed to create table " + tableName, e);
}
}
public Map<String, String> getTableSchema(String tableName) {
if (!tableExists(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + tableName + " does not exist");
}
Map<String, String> schema = new HashMap<>();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
while (result.next()) {
TYPE_CONVERTOR.doConvert(result, schema);
}
return schema;
} catch (SQLException e) {
throw new HoodieException("Failed to get table schema for " + tableName, e);
} finally {
closeQuietly(result, null);
}
}
@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + tableName);
return;
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
String sql = constructAddPartitions(tableName, partitionsToAdd);
updateDLASQL(sql);
}
public String constructAddPartitions(String tableName, List<String> partitions) {
return constructDLAAddPartitions(tableName, partitions);
}
String generateAbsolutePathStr(Path path) {
String absolutePathStr = path.toString();
if (path.toUri().getScheme() == null) {
absolutePathStr = getDefaultFs() + absolutePathStr;
}
return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
}
public List<String> constructChangePartitions(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
changePartitions.add(useDatabase);
String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
String changePartition =
alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
changePartitions.add(changePartition);
}
return changePartitions;
}
/**
* Generate Hive Partition from partition values.
*
* @param partition Partition path
* @return
*/
public String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
"Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
}
return partBuilder.stream().collect(Collectors.joining(","));
}
private String constructDLAAddPartitions(String tableName, List<String> partitions) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
.append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
.append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
.append("' ");
}
return alterSQL.toString();
}
private void updateDLASQL(String sql) {
Statement stmt = null;
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + sql);
stmt.execute(sql);
} catch (SQLException e) {
throw new HoodieException("Failed in executing SQL " + sql, e);
} finally {
closeQuietly(null, stmt);
}
}
@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
String sql = consutructShowCreateTableSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery(sql);
} catch (SQLException e) {
return false;
} finally {
closeQuietly(rs, stmt);
}
return true;
}
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
String sql = consutructShowCreateTableSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery(sql);
if (rs.next()) {
String table = rs.getString(2);
Map<String, String> attr = new HashMap<>();
int index = table.indexOf(TBL_PROPERTIES_STR);
if (index != -1) {
String sub = table.substring(index + TBL_PROPERTIES_STR.length());
sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
String[] str = sub.split(",");
for (int i = 0; i < str.length; i++) {
String key = str[i].split("=")[0].trim();
String value = str[i].split("=")[1].trim();
attr.put(key, value);
}
}
return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
}
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
} finally {
closeQuietly(rs, stmt);
}
}
@Override
public void updateLastCommitTimeSynced(String tableName) {
// TODO : dla do not support update tblproperties, so do nothing.
}
@Override
public Option<String> getLastReplicatedTime(String tableName) {
// no op; unsupported
return Option.empty();
}
@Override
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
// no op; unsupported
}
@Override
public void deleteLastReplicatedTimeStamp(String tableName) {
// no op; unsupported
}
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + tableName);
return;
}
LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
List<String> sqls = constructChangePartitions(tableName, changedPartitions);
for (String sql : sqls) {
updateDLASQL(sql);
}
}
@Override
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
throw new UnsupportedOperationException("Not support dropPartitions yet.");
}
public Map<List<String>, String> scanTablePartitions(String tableName) {
String sql = constructShowPartitionSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
Map<List<String>, String> partitions = new HashMap<>();
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + sql);
rs = stmt.executeQuery(sql);
while (rs.next()) {
if (rs.getMetaData().getColumnCount() > 0) {
String str = rs.getString(1);
if (!StringUtils.isNullOrEmpty(str)) {
List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values));
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
partitions.put(values, fullStoragePartitionPath);
}
}
}
return partitions;
} catch (SQLException e) {
throw new HoodieException("Failed in executing SQL " + sql, e);
} finally {
closeQuietly(rs, stmt);
}
}
public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> tablePartitions, List<String> partitionStoragePartitions) {
Map<String, String> paths = new HashMap<>();
for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) {
List<String> partitionValues = entry.getKey();
Collections.sort(partitionValues);
String fullTablePartitionPath = entry.getValue();
paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (dlaConfig.useDLASyncHiveStylePartitioning) {
String partition = String.join("/", storagePartitionValues);
storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
}
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
return events;
}
public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns");
ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type");
Map<String, String> columns = schemaDiff.getAddColumnTypes();
for (Map.Entry<String, String> entry : columns.entrySet()) {
String columnName = entry.getKey();
String columnType = entry.getValue();
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER)
.append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".")
.append(DLA_ESCAPE_CHARACTER).append(tableName)
.append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(")
.append(columnName).append(" ").append(columnType).append(" )");
LOG.info("Updating table definition with " + sqlBuilder);
updateDLASQL(sqlBuilder.toString());
}
}
@Override
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.error("Could not close connection ", e);
}
}
private String constructShowPartitionSQL(String tableName) {
String sql = "show partitions " + dlaConfig.databaseName + "." + tableName;
return sql;
}
private String consutructShowCreateTableSQL(String tableName) {
String sql = "show create table " + dlaConfig.databaseName + "." + tableName;
return sql;
}
private String getDefaultFs() {
return fs.getConf().get("fs.defaultFS");
}
private HiveSyncConfig toHiveSyncConfig() {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.partitionFields = dlaConfig.partitionFields;
hiveSyncConfig.databaseName = dlaConfig.databaseName;
Path basePath = new Path(dlaConfig.basePath);
hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
return hiveSyncConfig;
}
}

View File

@@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla.util;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.dla.DLASyncConfig;
import java.util.ArrayList;
import java.util.Arrays;
public class Utils {
public static String DLA_DATABASE_OPT_KEY = "hoodie.datasource.dla_sync.database";
public static String DLA_TABLE_OPT_KEY = "hoodie.datasource.dla_sync.table";
public static String DLA_USER_OPT_KEY = "hoodie.datasource.dla_sync.username";
public static String DLA_PASS_OPT_KEY = "hoodie.datasource.dla_sync.password";
public static String DLA_URL_OPT_KEY = "hoodie.datasource.dla_sync.jdbcurl";
public static String BATH_PATH = "basePath";
public static String DLA_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.dla_sync.partition_fields";
public static String DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.dla_sync.partition_extractor_class";
public static String DLA_ASSUME_DATE_PARTITIONING = "hoodie.datasource.dla_sync.assume_date_partitioning";
public static String DLA_SKIP_RO_SUFFIX = "hoodie.datasource.dla_sync.skip_ro_suffix";
public static String DLA_SKIP_RT_SYNC = "hoodie.datasource.dla_sync.skip_rt_sync";
public static String DLA_SYNC_HIVE_STYLE_PARTITIONING = "hoodie.datasource.dla_sync.hive.style.partitioning";
public static TypedProperties configToProperties(DLASyncConfig cfg) {
TypedProperties properties = new TypedProperties();
properties.put(DLA_DATABASE_OPT_KEY, cfg.databaseName);
properties.put(DLA_TABLE_OPT_KEY, cfg.tableName);
properties.put(DLA_USER_OPT_KEY, cfg.dlaUser);
properties.put(DLA_PASS_OPT_KEY, cfg.dlaPass);
properties.put(DLA_URL_OPT_KEY, cfg.jdbcUrl);
properties.put(BATH_PATH, cfg.basePath);
properties.put(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY, cfg.partitionValueExtractorClass);
properties.put(DLA_ASSUME_DATE_PARTITIONING, String.valueOf(cfg.assumeDatePartitioning));
properties.put(DLA_SKIP_RO_SUFFIX, String.valueOf(cfg.skipROSuffix));
properties.put(DLA_SYNC_HIVE_STYLE_PARTITIONING, String.valueOf(cfg.useDLASyncHiveStylePartitioning));
return properties;
}
public static DLASyncConfig propertiesToConfig(TypedProperties properties) {
DLASyncConfig config = new DLASyncConfig();
config.databaseName = properties.getProperty(DLA_DATABASE_OPT_KEY);
config.tableName = properties.getProperty(DLA_TABLE_OPT_KEY);
config.dlaUser = properties.getProperty(DLA_USER_OPT_KEY);
config.dlaPass = properties.getProperty(DLA_PASS_OPT_KEY);
config.jdbcUrl = properties.getProperty(DLA_URL_OPT_KEY);
config.basePath = properties.getProperty(BATH_PATH);
if (StringUtils.isNullOrEmpty(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY))) {
config.partitionFields = new ArrayList<>();
} else {
config.partitionFields = Arrays.asList(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY).split(","));
}
config.partitionValueExtractorClass = properties.getProperty(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY);
config.assumeDatePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_ASSUME_DATE_PARTITIONING, "false"));
config.skipROSuffix = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RO_SUFFIX, "false"));
config.skipRTSync = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RT_SYNC, "false"));
config.useDLASyncHiveStylePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_SYNC_HIVE_STYLE_PARTITIONING, "false"));
return config;
}
}

View File

@@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestDLASyncConfig {
@Test
public void testCopy() {
DLASyncConfig dlaSyncConfig = new DLASyncConfig();
List<String> partitions = Arrays.asList("a", "b");
dlaSyncConfig.partitionFields = partitions;
dlaSyncConfig.basePath = "/tmp";
dlaSyncConfig.assumeDatePartitioning = true;
dlaSyncConfig.databaseName = "test";
dlaSyncConfig.tableName = "test";
dlaSyncConfig.dlaUser = "dla";
dlaSyncConfig.dlaPass = "dla";
dlaSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
dlaSyncConfig.skipROSuffix = false;
DLASyncConfig copied = DLASyncConfig.copy(dlaSyncConfig);
assertEquals(copied.partitionFields, dlaSyncConfig.partitionFields);
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.assumeDatePartitioning, dlaSyncConfig.assumeDatePartitioning);
assertEquals(copied.databaseName, dlaSyncConfig.databaseName);
assertEquals(copied.tableName, dlaSyncConfig.tableName);
assertEquals(copied.dlaUser, dlaSyncConfig.dlaUser);
assertEquals(copied.dlaPass, dlaSyncConfig.dlaPass);
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp);
}
}

View File

@@ -27,9 +27,8 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -43,20 +42,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
/**
* Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api
* HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive-sync.jar HiveSyncTool [args]
@@ -248,8 +240,9 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
if (hiveSyncConfig.syncAsSparkDataSourceTable) {
Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.sparkSchemaLengthThreshold, schema);
Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized);
Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields,
hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema);
Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
}
@@ -309,75 +302,6 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
return schemaChanged;
}
/**
* Get Spark Sql related table properties. This is used for spark datasource table.
* @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
private Map<String, String> getSparkTableProperties(int schemaLengthThreshold, MessageType schema) {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
GroupType originGroupType = schema.asGroupType();
List<String> partitionNames = hiveSyncConfig.partitionFields;
List<Type> partitionCols = new ArrayList<>();
List<Type> dataCols = new ArrayList<>();
Map<String, Type> column2Field = new HashMap<>();
for (Type field : originGroupType.getFields()) {
column2Field.put(field.getName(), field);
}
// Get partition columns and data columns.
for (String partitionName : partitionNames) {
// Default the unknown partition fields to be String.
// Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
partitionCols.add(column2Field.getOrDefault(partitionName,
new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8)));
}
for (Type field : originGroupType.getFields()) {
if (!partitionNames.contains(field.getName())) {
dataCols.add(field);
}
}
List<Type> reOrderedFields = new ArrayList<>();
reOrderedFields.addAll(dataCols);
reOrderedFields.addAll(partitionCols);
GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields);
Map<String, String> sparkProperties = new HashMap<>();
sparkProperties.put("spark.sql.sources.provider", "hudi");
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.sparkVersion)) {
sparkProperties.put("spark.sql.create.version", hiveSyncConfig.sparkVersion);
}
// Split the schema string to multi-parts according the schemaLengthThreshold size.
String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
// Add each part of schema string to sparkProperties
for (int i = 0; i < numSchemaPart; i++) {
int start = i * schemaLengthThreshold;
int end = Math.min(start + schemaLengthThreshold, schemaString.length());
sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
}
// Add partition columns
if (!partitionNames.isEmpty()) {
sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
for (int i = 0; i < partitionNames.size(); i++) {
sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
}
}
return sparkProperties;
}
private Map<String, String> getSparkSerdeProperties(boolean readAsOptimized) {
Map<String, String> sparkSerdeProperties = new HashMap<>();
sparkSerdeProperties.put("path", hiveSyncConfig.basePath);
sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
return sparkSerdeProperties;
}
/**
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.hive;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
import org.apache.spark.sql.execution.SparkSqlParser;
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;

View File

@@ -18,12 +18,26 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
/**
* Base class to sync Hudi meta data with Metastores to make
* Hudi table queryable through external systems.
@@ -46,4 +60,72 @@ public abstract class AbstractSyncTool {
public abstract void syncHoodieTable();
/**
* Get Spark Sql related table properties. This is used for spark datasource table.
* @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
protected Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
int schemaLengthThreshold, MessageType schema) {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
GroupType originGroupType = schema.asGroupType();
List<Type> partitionCols = new ArrayList<>();
List<Type> dataCols = new ArrayList<>();
Map<String, Type> column2Field = new HashMap<>();
for (Type field : originGroupType.getFields()) {
column2Field.put(field.getName(), field);
}
// Get partition columns and data columns.
for (String partitionName : partitionNames) {
// Default the unknown partition fields to be String.
// Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
partitionCols.add(column2Field.getOrDefault(partitionName,
new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8)));
}
for (Type field : originGroupType.getFields()) {
if (!partitionNames.contains(field.getName())) {
dataCols.add(field);
}
}
List<Type> reOrderedFields = new ArrayList<>();
reOrderedFields.addAll(dataCols);
reOrderedFields.addAll(partitionCols);
GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields);
Map<String, String> sparkProperties = new HashMap<>();
sparkProperties.put("spark.sql.sources.provider", "hudi");
if (!StringUtils.isNullOrEmpty(sparkVersion)) {
sparkProperties.put("spark.sql.create.version", sparkVersion);
}
// Split the schema string to multi-parts according the schemaLengthThreshold size.
String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart));
// Add each part of schema string to sparkProperties
for (int i = 0; i < numSchemaPart; i++) {
int start = i * schemaLengthThreshold;
int end = Math.min(start + schemaLengthThreshold, schemaString.length());
sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end));
}
// Add partition columns
if (!partitionNames.isEmpty()) {
sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size()));
for (int i = 0; i < partitionNames.size(); i++) {
sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i));
}
}
return sparkProperties;
}
protected Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
Map<String, String> sparkSerdeProperties = new HashMap<>();
sparkSerdeProperties.put("path", basePath);
sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
return sparkSerdeProperties;
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.hive.util;
package org.apache.hudi.sync.common.util;
import java.util.HashMap;
import java.util.Map;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.hive.util;
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.parquet.schema.GroupType;

View File

@@ -32,7 +32,7 @@
<modules>
<module>hudi-datahub-sync</module>
<module>hudi-dla-sync</module>
<module>hudi-adb-sync</module>
<module>hudi-hive-sync</module>
<module>hudi-sync-common</module>
</modules>