1
0

[HUDI-1302] Add support for timestamp field in HiveSync (#2129)

This commit is contained in:
satishkotha
2020-10-13 22:58:00 -07:00
committed by GitHub
parent c7d962efff
commit 7fa641ea9a
11 changed files with 86 additions and 24 deletions

View File

@@ -290,6 +290,8 @@ public class DataSourceUtils {
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
DataSourceWriteOptions.DEFAULT_HIVE_SUPPORT_TIMESTAMP()));
return hiveSyncConfig;
}
}

View File

@@ -302,6 +302,7 @@ object DataSourceWriteOptions {
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
// DEFAULT FOR HIVE SPECIFIC CONFIGS
val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
@@ -319,6 +320,7 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable"

View File

@@ -348,6 +348,7 @@ private[hudi] object HoodieSparkSqlWriter {
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig
}

View File

@@ -68,6 +68,9 @@ public class DLASyncConfig implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
public Boolean supportTimestamp = false;
public static DLASyncConfig copy(DLASyncConfig cfg) {
DLASyncConfig newConfig = new DLASyncConfig();
newConfig.databaseName = cfg.databaseName;
@@ -81,6 +84,7 @@ public class DLASyncConfig implements Serializable {
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.skipROSuffix = cfg.skipROSuffix;
newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning;
newConfig.supportTimestamp = cfg.supportTimestamp;
return newConfig;
}

View File

@@ -159,7 +159,7 @@ public class DLASyncTool extends AbstractSyncTool {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieDLAClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieDLAClient.updateTableDefinition(tableName, schemaDiff);

View File

@@ -50,5 +50,6 @@ public class TestDLASyncConfig {
assertEquals(copied.basePath, dlaSyncConfig.basePath);
assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl);
assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix);
assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp);
}
}

View File

@@ -80,6 +80,10 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+ "Disabled by default for backward compatibility.")
public Boolean supportTimestamp = false;
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
@@ -92,6 +96,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.supportTimestamp = cfg.supportTimestamp;
return newConfig;
}
@@ -100,7 +105,7 @@ public class HiveSyncConfig implements Serializable {
return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\''
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}';
}
}

View File

@@ -182,7 +182,7 @@ public class HiveSyncTool extends AbstractSyncTool {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);

View File

@@ -239,7 +239,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
void updateTableDefinition(String tableName, MessageType newSchema) {
try {
String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields);
String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields, syncConfig.supportTimestamp);
// Cascade clause should not be present for non-partitioned tables
String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)

View File

@@ -33,6 +33,7 @@ import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -51,10 +52,15 @@ public class HiveSchemaUtil {
* Get the schema difference between the storage schema and hive table schema.
*/
public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema,
List<String> partitionKeys) {
List<String> partitionKeys) {
return getSchemaDifference(storageSchema, tableSchema, partitionKeys, false);
}
public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema,
List<String> partitionKeys, boolean supportTimestamp) {
Map<String, String> newTableSchema;
try {
newTableSchema = convertParquetSchemaToHiveSchema(storageSchema);
newTableSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
}
@@ -132,16 +138,16 @@ public class HiveSchemaUtil {
* @param messageType : Parquet Schema
* @return : Hive Table schema read from parquet file MAP[String,String]
*/
public static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType) throws IOException {
private static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException {
Map<String, String> schema = new LinkedHashMap<>();
List<Type> parquetFields = messageType.getFields();
for (Type parquetType : parquetFields) {
StringBuilder result = new StringBuilder();
String key = parquetType.getName();
if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
result.append(createHiveArray(parquetType, ""));
result.append(createHiveArray(parquetType, "", supportTimestamp));
} else {
result.append(convertField(parquetType));
result.append(convertField(parquetType, supportTimestamp));
}
schema.put(hiveCompatibleFieldName(key, false), result.toString());
@@ -155,7 +161,7 @@ public class HiveSchemaUtil {
* @param parquetType : Single paruet field
* @return : Equivalent sHive schema
*/
private static String convertField(final Type parquetType) {
private static String convertField(final Type parquetType, boolean supportTimestamp) {
StringBuilder field = new StringBuilder();
if (parquetType.isPrimitive()) {
final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName =
@@ -167,7 +173,10 @@ public class HiveSchemaUtil {
.append(decimalMetadata.getScale()).append(")").toString();
} else if (originalType == OriginalType.DATE) {
return field.append("DATE").toString();
} else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) {
return field.append("TIMESTAMP").toString();
}
// TODO - fix the method naming here
return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
@Override
@@ -227,7 +236,7 @@ public class HiveSchemaUtil {
if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
return createHiveArray(elementType, parquetGroupType.getName());
return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp);
case MAP:
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
@@ -245,7 +254,7 @@ public class HiveSchemaUtil {
throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
return createHiveMap(convertField(keyType), convertField(valueType));
return createHiveMap(convertField(keyType, supportTimestamp), convertField(valueType, supportTimestamp));
case ENUM:
case UTF8:
return "string";
@@ -260,7 +269,7 @@ public class HiveSchemaUtil {
}
} else {
// if no original type then it's a record
return createHiveStruct(parquetGroupType.getFields());
return createHiveStruct(parquetGroupType.getFields(), supportTimestamp);
}
}
}
@@ -271,14 +280,14 @@ public class HiveSchemaUtil {
* @param parquetFields : list of parquet fields
* @return : Equivalent 'struct' Hive schema
*/
private static String createHiveStruct(List<Type> parquetFields) {
private static String createHiveStruct(List<Type> parquetFields, boolean supportTimestamp) {
StringBuilder struct = new StringBuilder();
struct.append("STRUCT< ");
for (Type field : parquetFields) {
// TODO: struct field name is only translated to support special char($)
// We will need to extend it to other collection type
struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : ");
struct.append(convertField(field)).append(", ");
struct.append(convertField(field, supportTimestamp)).append(", ");
}
struct.delete(struct.length() - 2, struct.length()); // Remove the last
// ", "
@@ -327,19 +336,19 @@ public class HiveSchemaUtil {
/**
* Create an Array Hive schema from equivalent parquet list type.
*/
private static String createHiveArray(Type elementType, String elementName) {
private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp) {
StringBuilder array = new StringBuilder();
array.append("ARRAY< ");
if (elementType.isPrimitive()) {
array.append(convertField(elementType));
array.append(convertField(elementType, supportTimestamp));
} else {
final GroupType groupType = elementType.asGroupType();
final List<Type> groupFields = groupType.getFields();
if (groupFields.size() > 1 || (groupFields.size() == 1
&& (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) {
array.append(convertField(elementType));
array.append(convertField(elementType, supportTimestamp));
} else {
array.append(convertField(groupType.getFields().get(0)));
array.append(convertField(groupType.getFields().get(0), supportTimestamp));
}
}
array.append(">");
@@ -364,11 +373,15 @@ public class HiveSchemaUtil {
}
public static String generateSchemaString(MessageType storageSchema) throws IOException {
return generateSchemaString(storageSchema, new ArrayList<>());
return generateSchemaString(storageSchema, Collections.EMPTY_LIST);
}
public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
return generateSchemaString(storageSchema, colsToSkip, false);
}
public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip, boolean supportTimestamp) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp);
StringBuilder columns = new StringBuilder();
for (Map.Entry<String, String> hiveSchemaEntry : hiveSchema.entrySet()) {
if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) {
@@ -382,9 +395,9 @@ public class HiveSchemaUtil {
}
public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema);
String columns = generateSchemaString(storageSchema, config.partitionFields);
String outputFormatClass, String serdeClass) throws IOException {
Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp);
String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp);
List<String> partitionFields = new ArrayList<>();
for (String partitionKey : config.partitionFields) {

View File

@@ -41,6 +41,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@@ -151,6 +152,39 @@ public class TestHiveSyncTool {
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}
@Test
public void testSchemaConvertTimestampMicros() throws IOException {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
// verify backward compability - int64 converted to bigint type
assertEquals("`my_element` bigint", schemaString);
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
assertEquals("`my_element` TIMESTAMP", schemaString);
}
@Test
public void testSchemaDiffForTimestampMicros() {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
// verify backward compability - int64 converted to bigint type
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), false);
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
assertTrue(schemaDifference.isEmpty());
// verify schema difference is calculated correctly when supportTimestamp is enabled
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), true);
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
assertTrue(schemaDifference.isEmpty());
}
@ParameterizedTest
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {