From e4fd195d9fd0cc1128b8c6797d88e56402b166bd Mon Sep 17 00:00:00 2001 From: Roc Marshal <64569824+RocMarshal@users.noreply.github.com> Date: Tue, 27 Apr 2021 21:32:32 +0800 Subject: [PATCH] [MINOR] Refactor method up to parent-class (#2822) --- .../org/apache/hudi/dla/HoodieDLAClient.java | 9 +--- .../apache/hudi/hive/HoodieHiveClient.java | 9 +--- .../apache/hudi/hive/TestHiveSyncTool.java | 39 ++++++++++++++++++ .../sync/common/AbstractSyncHoodieClient.java | 41 +++++++++++++++++++ 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index c5f1a7cf4..cd84c3b76 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -125,14 +125,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient { DatabaseMetaData databaseMetaData = connection.getMetaData(); result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null); while (result.next()) { - String columnName = result.getString(4); - String columnType = result.getString(6); - if ("DECIMAL".equals(columnType)) { - int columnSize = result.getInt("COLUMN_SIZE"); - int decimalDigits = result.getInt("DECIMAL_DIGITS"); - columnType += String.format("(%s,%s)", columnSize, decimalDigits); - } - schema.put(columnName, columnType); + TYPE_CONVERTOR.doConvert(result, schema); } return schema; } catch (SQLException e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index aa7719aee..d5d668b1c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -305,14 +305,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { DatabaseMetaData databaseMetaData = connection.getMetaData(); result = databaseMetaData.getColumns(null, syncConfig.databaseName, tableName, null); while (result.next()) { - String columnName = result.getString(4); - String columnType = result.getString(6); - if ("DECIMAL".equals(columnType)) { - int columnSize = result.getInt("COLUMN_SIZE"); - int decimalDigits = result.getInt("DECIMAL_DIGITS"); - columnType += String.format("(%s,%s)", columnSize, decimalDigits); - } - schema.put(columnName, columnType); + TYPE_CONVERTOR.doConvert(result, schema); } return schema; } catch (SQLException e) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 300e9378a..4ec4f0d3d 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -689,4 +689,43 @@ public class TestHiveSyncTool { "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); } + @ParameterizedTest + @MethodSource("useJdbc") + public void testTypeConverter(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.createCOWTable("100", 5, true); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + String tableName = HiveTestUtil.hiveSyncConfig.tableName; + String tableAbsoluteName = String.format(" `%s.%s` ", HiveTestUtil.hiveSyncConfig.databaseName, tableName); + String dropTableSql = String.format("DROP TABLE IF EXISTS %s ", tableAbsoluteName); + String createTableSqlPrefix = String.format("CREATE TABLE IF NOT EXISTS %s ", tableAbsoluteName); + String errorMsg = "An error occurred in decimal type converting."; + hiveClient.updateHiveSQL(dropTableSql); + + // test one column in DECIMAL + String oneTargetColumnSql = createTableSqlPrefix + "(`decimal_col` DECIMAL(9,8), `bigint_col` BIGINT)"; + hiveClient.updateHiveSQL(oneTargetColumnSql); + System.out.println(hiveClient.getTableSchema(tableName)); + assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)"), errorMsg); + hiveClient.updateHiveSQL(dropTableSql); + + // test multiple columns in DECIMAL + String multipleTargetColumnSql = + createTableSqlPrefix + "(`decimal_col1` DECIMAL(9,8), `bigint_col` BIGINT, `decimal_col2` DECIMAL(7,4))"; + hiveClient.updateHiveSQL(multipleTargetColumnSql); + System.out.println(hiveClient.getTableSchema(tableName)); + assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)") + && hiveClient.getTableSchema(tableName).containsValue("DECIMAL(7,4)"), errorMsg); + hiveClient.updateHiveSQL(dropTableSql); + + // test no columns in DECIMAL + String noTargetColumnsSql = createTableSqlPrefix + "(`bigint_col` BIGINT)"; + hiveClient.updateHiveSQL(noTargetColumnsSql); + System.out.println(hiveClient.getTableSchema(tableName)); + assertTrue(hiveClient.getTableSchema(tableName).size() == 1 && hiveClient.getTableSchema(tableName) + .containsValue("BIGINT"), errorMsg); + hiveClient.updateHiveSQL(dropTableSql); + } + } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index f9ada2fa8..90f6017f1 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -29,20 +29,25 @@ import org.apache.hudi.common.util.Option; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; +import java.io.Serializable; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; +import java.util.Objects; public abstract class AbstractSyncHoodieClient { private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {}; + protected final HoodieTableMetaClient metaClient; protected final HoodieTableType tableType; protected final FileSystem fs; @@ -150,6 +155,42 @@ public abstract class AbstractSyncHoodieClient { } } + public abstract static class TypeConverter implements Serializable { + + static final String DEFAULT_TARGET_TYPE = "DECIMAL"; + + protected String targetType; + + public TypeConverter() { + this.targetType = DEFAULT_TARGET_TYPE; + } + + public TypeConverter(String targetType) { + ValidationUtils.checkArgument(Objects.nonNull(targetType)); + this.targetType = targetType; + } + + public void doConvert(ResultSet resultSet, Map schema) throws SQLException { + schema.put(getColumnName(resultSet), targetType.equalsIgnoreCase(getColumnType(resultSet)) + ? convert(resultSet) : getColumnType(resultSet)); + } + + public String convert(ResultSet resultSet) throws SQLException { + String columnType = getColumnType(resultSet); + int columnSize = resultSet.getInt("COLUMN_SIZE"); + int decimalDigits = resultSet.getInt("DECIMAL_DIGITS"); + return columnType + String.format("(%s,%s)", columnSize, decimalDigits); + } + + public String getColumnName(ResultSet resultSet) throws SQLException { + return resultSet.getString(4); + } + + public String getColumnType(ResultSet resultSet) throws SQLException { + return resultSet.getString(6); + } + } + /** * Read the schema from the log file on path. */