1
0

[HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync, hudi-dla-sync (#1810)

- Generalize the hive-sync module for syncing to multiple metastores
- Added new options for datasource
- Added new command line for delta streamer 

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
lw0090
2020-08-06 12:34:55 +08:00
committed by GitHub
parent c21209cb58
commit 51ea27d665
44 changed files with 1663 additions and 145 deletions

View File

@@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hbase:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import com.beust.jcommander.Parameter;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Configs needed to sync data into DLA.
*/
public class DLASyncConfig implements Serializable {
@Parameter(names = {"--database"}, description = "name of the target database in DLA", required = true)
public String databaseName;
@Parameter(names = {"--table"}, description = "name of the target table in DLA", required = true)
public String tableName;
@Parameter(names = {"--user"}, description = "DLA username", required = true)
public String dlaUser;
@Parameter(names = {"--pass"}, description = "DLA password", required = true)
public String dlaPass;
@Parameter(names = {"--jdbc-url"}, description = "DLA jdbc connect url", required = true)
public String jdbcUrl;
@Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true)
public String basePath;
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ "to extract the partition values from HDFS path")
public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;
@Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2")
public Boolean useDLASyncHiveStylePartitioning = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
public static DLASyncConfig copy(DLASyncConfig cfg) {
DLASyncConfig newConfig = new DLASyncConfig();
newConfig.databaseName = cfg.databaseName;
newConfig.tableName = cfg.tableName;
newConfig.dlaUser = cfg.dlaUser;
newConfig.dlaPass = cfg.dlaPass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.basePath = cfg.basePath;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.skipROSuffix = cfg.skipROSuffix;
newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
return newConfig;
}
@Override
public String toString() {
return "DLASyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", dlaUser='" + dlaUser + '\'' + ", dlaPass='" + dlaPass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning
+ ", help=" + help + '}';
}
}

View File

@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.dla.util.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* Tool to sync a hoodie table with a dla table. Either use it as a api
* DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
* <p>
* This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
public class DLASyncTool extends AbstractSyncTool {
private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final DLASyncConfig cfg;
private final HoodieDLAClient hoodieDLAClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
public DLASyncTool(Properties properties, FileSystem fs) {
super(properties, fs);
this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
this.cfg = Utils.propertiesToConfig(properties);
switch (hoodieDLAClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
throw new InvalidTableException(hoodieDLAClient.getBasePath());
}
}
@Override
public void syncHoodieTable() {
try {
switch (hoodieDLAClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableTableName.get(), false);
// sync a RT table for MOR
syncHoodieTable(snapshotTableName, true);
break;
default:
LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
throw new InvalidTableException(hoodieDLAClient.getBasePath());
}
} catch (RuntimeException re) {
LOG.error("Got runtime exception when dla syncing", re);
} finally {
hoodieDLAClient.close();
}
}
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+ " of type " + hoodieDLAClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieDLAClient.getDataSchema();
// Sync schema if needed
syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
// TODO : once DLA supports alter table properties
Option<String> lastCommitTimeSynced = Option.empty();
/*if (tableExists) {
lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
}*/
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = hoodieDLAClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(tableName, writtenPartitionsSince);
hoodieDLAClient.updateLastCommitTimeSynced(tableName);
LOG.info("Sync complete for " + tableName);
}
/**
* Get the latest schema from the last commit and check if its in sync with the dla table schema. If not, evolves the
* table schema.
*
* @param tableExists - does table exist
* @param schema - extracted schema
*/
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
// Check and sync schema
if (!tableExists) {
LOG.info("DLA table " + tableName + " is not found. Creating it");
if (!useRealTimeInputFormat) {
String inputFormatClassName = HoodieParquetInputFormat.class.getName();
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
} else {
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
// /ql/exec/DDLTask.java#L3488
String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName();
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
}
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieDLAClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);
} else {
LOG.info("No Schema difference for " + tableName);
}
}
}
/**
* Syncs the list of storage parititions passed in (checks if the partition is in dla, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
if (cfg.partitionFields.isEmpty()) {
LOG.info("not a partitioned table.");
return;
}
Map<List<String>, String> partitions = hoodieDLAClient.scanTablePartitions(tableName);
List<AbstractSyncHoodieClient.PartitionEvent> partitionEvents =
hoodieDLAClient.getPartitionEvents(partitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieDLAClient.addPartitionsToTable(tableName, newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieDLAClient.updatePartitionsToTable(tableName, updatePartitions);
} catch (Exception e) {
throw new HoodieException("Failed to sync partitions for table " + tableName, e);
}
}
private List<String> filterPartitions(List<AbstractSyncHoodieClient.PartitionEvent> events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) {
return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition)
.collect(Collectors.toList());
}
public static void main(String[] args) {
// parse the params
final DLASyncConfig cfg = new DLASyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
new DLASyncTool(Utils.configToProperties(cfg), fs).syncHoodieTable();
}
}

View File

@@ -0,0 +1,403 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class HoodieDLAClient extends AbstractSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
// Make sure we have the dla JDBC driver in classpath
private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
private static final String DLA_ESCAPE_CHARACTER = "";
private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
static {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
}
}
private Connection connection;
private DLASyncConfig dlaConfig;
private PartitionValueExtractor partitionValueExtractor;
public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
this.dlaConfig = syncConfig;
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieException(
"Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
}
createDLAConnection();
}
private void createDLAConnection() {
if (connection == null) {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
LOG.error("Unable to load DLA driver class", e);
return;
}
try {
this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl);
} catch (SQLException e) {
throw new HoodieException("Cannot create dla connection ", e);
}
}
}
@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
try {
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass);
LOG.info("Creating table with " + createSQLQuery);
updateDLASQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieException("Failed to create table " + tableName, e);
}
}
public Map<String, String> getTableSchema(String tableName) {
if (!doesTableExist(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + tableName + " does not exist");
}
Map<String, String> schema = new HashMap<>();
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
if ("DECIMAL".equals(columnType)) {
int columnSize = result.getInt("COLUMN_SIZE");
int decimalDigits = result.getInt("DECIMAL_DIGITS");
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
}
schema.put(columnName, columnType);
}
return schema;
} catch (SQLException e) {
throw new HoodieException("Failed to get table schema for " + tableName, e);
} finally {
closeQuietly(result, null);
}
}
@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + tableName);
return;
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
String sql = constructAddPartitions(tableName, partitionsToAdd);
updateDLASQL(sql);
}
public String constructAddPartitions(String tableName, List<String> partitions) {
return constructDLAAddPartitions(tableName, partitions);
}
String generateAbsolutePathStr(Path path) {
String absolutePathStr = path.toString();
if (path.toUri().getScheme() == null) {
absolutePathStr = getDefaultFs() + absolutePathStr;
}
return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
}
public List<String> constructChangePartitions(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
changePartitions.add(useDatabase);
String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
String changePartition =
alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
changePartitions.add(changePartition);
}
return changePartitions;
}
/**
* Generate Hive Partition from partition values.
*
* @param partition Partition path
* @return
*/
public String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
"Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
}
return partBuilder.stream().collect(Collectors.joining(","));
}
private String constructDLAAddPartitions(String tableName, List<String> partitions) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
.append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
.append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
.append("' ");
}
return alterSQL.toString();
}
private void updateDLASQL(String sql) {
Statement stmt = null;
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + sql);
stmt.execute(sql);
} catch (SQLException e) {
throw new HoodieException("Failed in executing SQL " + sql, e);
} finally {
closeQuietly(null, stmt);
}
}
@Override
public boolean doesTableExist(String tableName) {
String sql = consutructShowCreateTableSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery(sql);
} catch (SQLException e) {
return false;
} finally {
closeQuietly(rs, stmt);
}
return true;
}
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
String sql = consutructShowCreateTableSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = connection.createStatement();
rs = stmt.executeQuery(sql);
if (rs.next()) {
String table = rs.getString(2);
Map<String, String> attr = new HashMap<>();
int index = table.indexOf(TBL_PROPERTIES_STR);
if (index != -1) {
String sub = table.substring(index + TBL_PROPERTIES_STR.length());
sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
String[] str = sub.split(",");
for (int i = 0; i < str.length; i++) {
String key = str[i].split("=")[0].trim();
String value = str[i].split("=")[1].trim();
attr.put(key, value);
}
}
return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
}
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
} finally {
closeQuietly(rs, stmt);
}
}
@Override
public void updateLastCommitTimeSynced(String tableName) {
// TODO : dla do not support update tblproperties, so do nothing.
}
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + tableName);
return;
}
LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
List<String> sqls = constructChangePartitions(tableName, changedPartitions);
for (String sql : sqls) {
updateDLASQL(sql);
}
}
public Map<List<String>, String> scanTablePartitions(String tableName) {
String sql = constructShowPartitionSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
Map<List<String>, String> partitions = new HashMap<>();
try {
stmt = connection.createStatement();
LOG.info("Executing SQL " + sql);
rs = stmt.executeQuery(sql);
while (rs.next()) {
if (rs.getMetaData().getColumnCount() > 0) {
String str = rs.getString(1);
if (!StringUtils.isNullOrEmpty(str)) {
List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values));
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
partitions.put(values, fullStoragePartitionPath);
}
}
}
return partitions;
} catch (SQLException e) {
throw new HoodieException("Failed in executing SQL " + sql, e);
} finally {
closeQuietly(rs, stmt);
}
}
public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> tablePartitions, List<String> partitionStoragePartitions) {
Map<String, String> paths = new HashMap<>();
for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) {
List<String> partitionValues = entry.getKey();
Collections.sort(partitionValues);
String fullTablePartitionPath = entry.getValue();
paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (dlaConfig.useDLASyncHiveStylePartitioning) {
String partition = String.join("/", storagePartitionValues);
storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
}
Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
return events;
}
public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) {
ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns");
ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type");
Map<String, String> columns = schemaDiff.getAddColumnTypes();
for (Map.Entry<String, String> entry : columns.entrySet()) {
String columnName = entry.getKey();
String columnType = entry.getValue();
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER)
.append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".")
.append(DLA_ESCAPE_CHARACTER).append(tableName)
.append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(")
.append(columnName).append(" ").append(columnType).append(" )");
LOG.info("Updating table definition with " + sqlBuilder);
updateDLASQL(sqlBuilder.toString());
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.error("Could not close connection ", e);
}
}
private String constructShowPartitionSQL(String tableName) {
String sql = "show partitions " + dlaConfig.databaseName + "." + tableName;
return sql;
}
private String consutructShowCreateTableSQL(String tableName) {
String sql = "show create table " + dlaConfig.databaseName + "." + tableName;
return sql;
}
private String getDefaultFs() {
return fs.getConf().get("fs.defaultFS");
}
private HiveSyncConfig toHiveSyncConfig() {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.partitionFields = dlaConfig.partitionFields;
hiveSyncConfig.databaseName = dlaConfig.databaseName;
Path basePath = new Path(dlaConfig.basePath);
hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
return hiveSyncConfig;
}
}

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla.util;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.dla.DLASyncConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class Utils {
public static String DLA_DATABASE_OPT_KEY = "hoodie.datasource.dla_sync.database";
public static String DLA_TABLE_OPT_KEY = "hoodie.datasource.dla_sync.table";
public static String DLA_USER_OPT_KEY = "hoodie.datasource.dla_sync.username";
public static String DLA_PASS_OPT_KEY = "hoodie.datasource.dla_sync.password";
public static String DLA_URL_OPT_KEY = "hoodie.datasource.dla_sync.jdbcurl";
public static String BATH_PATH = "basePath";
public static String DLA_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.dla_sync.partition_fields";
public static String DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.dla_sync.partition_extractor_class";
public static String DLA_ASSUME_DATE_PARTITIONING = "hoodie.datasource.dla_sync.assume_date_partitioning";
public static String DLA_SKIP_RO_SUFFIX = "hoodie.datasource.dla_sync.skip_ro_suffix";
public static String DLA_SYNC_HIVE_STYLE_PARTITIONING = "hoodie.datasource.dla_sync.hive.style.partitioning";
public static Properties configToProperties(DLASyncConfig cfg) {
Properties properties = new Properties();
properties.put(DLA_DATABASE_OPT_KEY, cfg.databaseName);
properties.put(DLA_TABLE_OPT_KEY, cfg.tableName);
properties.put(DLA_USER_OPT_KEY, cfg.dlaUser);
properties.put(DLA_PASS_OPT_KEY, cfg.dlaPass);
properties.put(DLA_URL_OPT_KEY, cfg.jdbcUrl);
properties.put(BATH_PATH, cfg.basePath);
properties.put(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY, cfg.partitionValueExtractorClass);
properties.put(DLA_ASSUME_DATE_PARTITIONING, String.valueOf(cfg.assumeDatePartitioning));
properties.put(DLA_SKIP_RO_SUFFIX, String.valueOf(cfg.skipROSuffix));
properties.put(DLA_SYNC_HIVE_STYLE_PARTITIONING, String.valueOf(cfg.useDLASyncHiveStylePartitioning));
return properties;
}
public static DLASyncConfig propertiesToConfig(Properties properties) {
DLASyncConfig config = new DLASyncConfig();
config.databaseName = properties.getProperty(DLA_DATABASE_OPT_KEY);
config.tableName = properties.getProperty(DLA_TABLE_OPT_KEY);
config.dlaUser = properties.getProperty(DLA_USER_OPT_KEY);
config.dlaPass = properties.getProperty(DLA_PASS_OPT_KEY);
config.jdbcUrl = properties.getProperty(DLA_URL_OPT_KEY);
config.basePath = properties.getProperty(BATH_PATH);
if (StringUtils.isNullOrEmpty(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY))) {
config.partitionFields = new ArrayList<>();
} else {
config.partitionFields = Arrays.asList(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY).split(","));
}
config.partitionValueExtractorClass = properties.getProperty(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY);
config.assumeDatePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_ASSUME_DATE_PARTITIONING, "false"));
config.skipROSuffix = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RO_SUFFIX, "false"));
config.useDLASyncHiveStylePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_SYNC_HIVE_STYLE_PARTITIONING, "false"));
return config;
}
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.dla;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestDLASyncConfig {
@Test
public void testCopy() {
DLASyncConfig dlaSyncConfig = new DLASyncConfig();
List<String> partitions = Arrays.asList("a", "b");
dlaSyncConfig.partitionFields = partitions;
dlaSyncConfig.basePath = "/tmp";
dlaSyncConfig.assumeDatePartitioning = true;
dlaSyncConfig.databaseName = "test";
dlaSyncConfig.tableName = "test";
dlaSyncConfig.dlaUser = "dla";
dlaSyncConfig.dlaPass = "dla";
dlaSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
dlaSyncConfig.skipROSuffix = false;
DLASyncConfig copied = DLASyncConfig.copy(dlaSyncConfig);
assertEquals(copied.partitionFields, dlaSyncConfig.partitionFields);
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.assumeDatePartitioning, dlaSyncConfig.assumeDatePartitioning);
assertEquals(copied.databaseName, dlaSyncConfig.databaseName);
assertEquals(copied.tableName, dlaSyncConfig.tableName);
assertEquals(copied.dlaUser, dlaSyncConfig.dlaUser);
assertEquals(copied.dlaPass, dlaSyncConfig.dlaPass);
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
}
}

View File

@@ -0,0 +1,29 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache.hudi=DEBUG
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# CONSOLE uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -0,0 +1,30 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
# A1 is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL