[HUDI-4335] Bug fixes in AWSGlueCatalogSyncClient post schema evolution. (#5995)
* fix for updateTableParameters which is not excluding partition columns and updateTableProperties boolean check * Fix - serde parameters getting overrided on table property update * removing stale syncConfig
This commit is contained in:
committed by
GitHub
parent
f20acb8dc3
commit
fc8d96246a
@@ -64,9 +64,9 @@ import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
|
||||
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
|
||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
|
||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
|
||||
import static org.apache.hudi.common.util.MapUtils.isNullOrEmpty;
|
||||
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
|
||||
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
|
||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
||||
@@ -193,11 +193,11 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
|
||||
*/
|
||||
@Override
|
||||
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
|
||||
if (nonEmpty(tableProperties)) {
|
||||
if (isNullOrEmpty(tableProperties)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true);
|
||||
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, false);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
|
||||
}
|
||||
@@ -210,10 +210,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
|
||||
try {
|
||||
Table table = getTable(awsGlue, databaseName, tableName);
|
||||
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
|
||||
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
|
||||
String keyType = getPartitionKeyType(newSchemaMap, key);
|
||||
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
|
||||
}).collect(Collectors.toList());
|
||||
List<Column> newColumns = getColumnsFromSchema(newSchemaMap);
|
||||
StorageDescriptor sd = table.getStorageDescriptor();
|
||||
sd.setColumns(newColumns);
|
||||
|
||||
@@ -258,15 +255,7 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
|
||||
try {
|
||||
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
|
||||
|
||||
List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
|
||||
for (String key : mapSchema.keySet()) {
|
||||
String keyType = getPartitionKeyType(mapSchema, key);
|
||||
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
|
||||
// In Glue, the full schema should exclude the partition keys
|
||||
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
|
||||
schemaWithoutPartitionKeys.add(column);
|
||||
}
|
||||
}
|
||||
List<Column> schemaWithoutPartitionKeys = getColumnsFromSchema(mapSchema);
|
||||
|
||||
// now create the schema partition
|
||||
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
|
||||
@@ -419,6 +408,19 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
|
||||
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
|
||||
}
|
||||
|
||||
private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
|
||||
List<Column> cols = new ArrayList<>();
|
||||
for (String key : mapSchema.keySet()) {
|
||||
// In Glue, the full schema should exclude the partition keys
|
||||
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
|
||||
String keyType = getPartitionKeyType(mapSchema, key);
|
||||
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
|
||||
cols.add(column);
|
||||
}
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
private enum TableType {
|
||||
MANAGED_TABLE,
|
||||
EXTERNAL_TABLE,
|
||||
|
||||
Reference in New Issue
Block a user