[MINOR] Refactor method up to parent-class (#2822)
This commit is contained in:
@@ -125,14 +125,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
|
||||
DatabaseMetaData databaseMetaData = connection.getMetaData();
|
||||
result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
|
||||
while (result.next()) {
|
||||
String columnName = result.getString(4);
|
||||
String columnType = result.getString(6);
|
||||
if ("DECIMAL".equals(columnType)) {
|
||||
int columnSize = result.getInt("COLUMN_SIZE");
|
||||
int decimalDigits = result.getInt("DECIMAL_DIGITS");
|
||||
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
|
||||
}
|
||||
schema.put(columnName, columnType);
|
||||
TYPE_CONVERTOR.doConvert(result, schema);
|
||||
}
|
||||
return schema;
|
||||
} catch (SQLException e) {
|
||||
|
||||
@@ -305,14 +305,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
||||
DatabaseMetaData databaseMetaData = connection.getMetaData();
|
||||
result = databaseMetaData.getColumns(null, syncConfig.databaseName, tableName, null);
|
||||
while (result.next()) {
|
||||
String columnName = result.getString(4);
|
||||
String columnType = result.getString(6);
|
||||
if ("DECIMAL".equals(columnType)) {
|
||||
int columnSize = result.getInt("COLUMN_SIZE");
|
||||
int decimalDigits = result.getInt("DECIMAL_DIGITS");
|
||||
columnType += String.format("(%s,%s)", columnSize, decimalDigits);
|
||||
}
|
||||
schema.put(columnName, columnType);
|
||||
TYPE_CONVERTOR.doConvert(result, schema);
|
||||
}
|
||||
return schema;
|
||||
} catch (SQLException e) {
|
||||
|
||||
@@ -689,4 +689,43 @@ public class TestHiveSyncTool {
|
||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbc")
|
||||
public void testTypeConverter(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.createCOWTable("100", 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
String tableName = HiveTestUtil.hiveSyncConfig.tableName;
|
||||
String tableAbsoluteName = String.format(" `%s.%s` ", HiveTestUtil.hiveSyncConfig.databaseName, tableName);
|
||||
String dropTableSql = String.format("DROP TABLE IF EXISTS %s ", tableAbsoluteName);
|
||||
String createTableSqlPrefix = String.format("CREATE TABLE IF NOT EXISTS %s ", tableAbsoluteName);
|
||||
String errorMsg = "An error occurred in decimal type converting.";
|
||||
hiveClient.updateHiveSQL(dropTableSql);
|
||||
|
||||
// test one column in DECIMAL
|
||||
String oneTargetColumnSql = createTableSqlPrefix + "(`decimal_col` DECIMAL(9,8), `bigint_col` BIGINT)";
|
||||
hiveClient.updateHiveSQL(oneTargetColumnSql);
|
||||
System.out.println(hiveClient.getTableSchema(tableName));
|
||||
assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)"), errorMsg);
|
||||
hiveClient.updateHiveSQL(dropTableSql);
|
||||
|
||||
// test multiple columns in DECIMAL
|
||||
String multipleTargetColumnSql =
|
||||
createTableSqlPrefix + "(`decimal_col1` DECIMAL(9,8), `bigint_col` BIGINT, `decimal_col2` DECIMAL(7,4))";
|
||||
hiveClient.updateHiveSQL(multipleTargetColumnSql);
|
||||
System.out.println(hiveClient.getTableSchema(tableName));
|
||||
assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)")
|
||||
&& hiveClient.getTableSchema(tableName).containsValue("DECIMAL(7,4)"), errorMsg);
|
||||
hiveClient.updateHiveSQL(dropTableSql);
|
||||
|
||||
// test no columns in DECIMAL
|
||||
String noTargetColumnsSql = createTableSqlPrefix + "(`bigint_col` BIGINT)";
|
||||
hiveClient.updateHiveSQL(noTargetColumnsSql);
|
||||
System.out.println(hiveClient.getTableSchema(tableName));
|
||||
assertTrue(hiveClient.getTableSchema(tableName).size() == 1 && hiveClient.getTableSchema(tableName)
|
||||
.containsValue("BIGINT"), errorMsg);
|
||||
hiveClient.updateHiveSQL(dropTableSql);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -29,20 +29,25 @@ import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractSyncHoodieClient {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
|
||||
|
||||
public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {};
|
||||
|
||||
protected final HoodieTableMetaClient metaClient;
|
||||
protected final HoodieTableType tableType;
|
||||
protected final FileSystem fs;
|
||||
@@ -150,6 +155,42 @@ public abstract class AbstractSyncHoodieClient {
|
||||
}
|
||||
}
|
||||
|
||||
public abstract static class TypeConverter implements Serializable {
|
||||
|
||||
static final String DEFAULT_TARGET_TYPE = "DECIMAL";
|
||||
|
||||
protected String targetType;
|
||||
|
||||
public TypeConverter() {
|
||||
this.targetType = DEFAULT_TARGET_TYPE;
|
||||
}
|
||||
|
||||
public TypeConverter(String targetType) {
|
||||
ValidationUtils.checkArgument(Objects.nonNull(targetType));
|
||||
this.targetType = targetType;
|
||||
}
|
||||
|
||||
public void doConvert(ResultSet resultSet, Map<String, String> schema) throws SQLException {
|
||||
schema.put(getColumnName(resultSet), targetType.equalsIgnoreCase(getColumnType(resultSet))
|
||||
? convert(resultSet) : getColumnType(resultSet));
|
||||
}
|
||||
|
||||
public String convert(ResultSet resultSet) throws SQLException {
|
||||
String columnType = getColumnType(resultSet);
|
||||
int columnSize = resultSet.getInt("COLUMN_SIZE");
|
||||
int decimalDigits = resultSet.getInt("DECIMAL_DIGITS");
|
||||
return columnType + String.format("(%s,%s)", columnSize, decimalDigits);
|
||||
}
|
||||
|
||||
public String getColumnName(ResultSet resultSet) throws SQLException {
|
||||
return resultSet.getString(4);
|
||||
}
|
||||
|
||||
public String getColumnType(ResultSet resultSet) throws SQLException {
|
||||
return resultSet.getString(6);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user