1
0

[HUDI-3730] Improve meta sync class design and hierarchies (#5854)

* [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
* Implements class design proposed in RFC-55

Co-authored-by: jian.feng <fengjian428@gmial.com>
Co-authored-by: jian.feng <jian.feng@shopee.com>
This commit is contained in:
Shiyan Xu
2022-07-03 04:17:25 -05:00
committed by GitHub
parent c00ea84985
commit c0e1587966
86 changed files with 2977 additions and 2877 deletions

View File

@@ -21,8 +21,8 @@ package org.apache.hudi.aws.sync;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.model.Partition;
import com.amazonaws.services.glue.AWSGlue;
@@ -50,10 +50,6 @@ import com.amazonaws.services.glue.model.StorageDescriptor;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -69,8 +65,12 @@ import java.util.stream.Collectors;
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
@@ -79,7 +79,7 @@ import static org.apache.hudi.sync.common.util.TableUtils.tableId;
*
* @Experimental
*/
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
public class AWSGlueCatalogSyncClient extends HoodieSyncClient {
private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
@@ -87,10 +87,10 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
private final AWSGlue awsGlue;
private final String databaseName;
public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
public AWSGlueCatalogSyncClient(HiveSyncConfig config) {
super(config);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.databaseName = syncConfig.databaseName;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
}
@Override
@@ -126,7 +126,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -160,7 +160,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -204,12 +204,12 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
public void updateTableSchema(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = syncConfig.partitionFields.size() > 0;
boolean cascade = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
@@ -237,21 +237,6 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
}
@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}
@Override
public void createTable(String tableName,
MessageType storageSchema,
@@ -265,26 +250,26 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
if (!syncConfig.createManagedTable) {
if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);
try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false);
List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (!syncConfig.partitionFields.contains(key)) {
if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}
// now create the schema partition
List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
List<Column> schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
@@ -293,7 +278,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
.withLocation(s3aToS3(syncConfig.basePath))
.withLocation(s3aToS3(getBasePath()))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
@@ -320,7 +305,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
@Override
public Map<String, String> getTableSchema(String tableName) {
public Map<String, String> getMetastoreSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
@@ -340,11 +325,6 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
}
@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
@@ -412,11 +392,11 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
@Override
public void updateLastCommitTimeSynced(String tableName) {
if (!activeTimeline.lastInstant().isPresent()) {
if (!getActiveTimeline().lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {

View File

@@ -18,53 +18,44 @@
package org.apache.hudi.aws.sync;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import java.util.Properties;
/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* <p>
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*
* @Experimental
*/
public class AwsGlueCatalogSyncTool extends HiveSyncTool {
public class AWSGlueCatalogSyncTool extends HiveSyncTool {
public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}
public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
}
@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
protected void initSyncClient(HiveSyncConfig hiveSyncConfig) {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}
public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
final HiveSyncConfig.HiveSyncConfigParams params = new HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
cmd.parse(args);
if (params.isHelp()) {
cmd.usage();
System.exit(1);
System.exit(0);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
new AWSGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
}
}