1
0

[HUDI-3383] Sync column comments while syncing a hive table (#4960)

Desc: Add a hive sync config(hoodie.datasource.hive_sync.sync_comment). This config defaults to false.
While syncing data source to hudi, add column comments to source avro schema, and the sync_comment is true, syncing column comments to the hive table.
This commit is contained in:
MrSleeping123
2022-03-10 09:44:39 +08:00
committed by GitHub
parent 548000b0d6
commit 8859b48b2a
13 changed files with 467 additions and 14 deletions

View File

@@ -132,6 +132,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--spark-version"}, description = "The spark version", required = false)
public String sparkVersion;
@Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
public boolean syncComment = false;
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -159,6 +162,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.withOperationField = cfg.withOperationField;
newConfig.isConditionalSync = cfg.isConditionalSync;
newConfig.sparkVersion = cfg.sparkVersion;
newConfig.syncComment = cfg.syncComment;
return newConfig;
}
@@ -193,6 +197,7 @@ public class HiveSyncConfig implements Serializable {
+ ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+ ", withOperationField=" + withOperationField
+ ", isConditionalSync=" + isConditionalSync
+ ", syncComment=" + syncComment
+ '}';
}

View File

@@ -19,9 +19,11 @@
package org.apache.hudi.hive;
import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -37,6 +39,7 @@ import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType;
@@ -261,6 +264,19 @@ public class HiveSyncTool extends AbstractSyncTool {
LOG.info("No Schema difference for " + tableName);
}
}
if (cfg.syncComment) {
Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields()
.stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
boolean allEmpty = newComments.values().stream().allMatch(StringUtils::isNullOrEmpty);
if (!allEmpty) {
List<FieldSchema> hiveSchema = hoodieHiveClient.getTableCommentUsingMetastoreClient(tableName);
hoodieHiveClient.updateTableComments(tableName, hiveSchema, avroSchemaWithoutMetadataFields.getFields());
} else {
LOG.info(String.format("No comment %s need to add", tableName));
}
}
return schemaChanged;
}

View File

@@ -19,21 +19,27 @@
package org.apache.hudi.hive;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.ddl.DDLExecutor;
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.ddl.JDBCExecutor;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -46,7 +52,9 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
@@ -343,4 +351,43 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
}
public Schema getAvroSchemaWithoutMetadataFields() {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read avro schema", e);
}
}
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
try {
return client.getSchema(syncConfig.databaseName, tableName);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table comments for : " + tableName, e);
}
}
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
Map<String,String> newComments = newSchema.stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT), field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
updateTableComments(tableName,oldSchema,newComments);
}
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String,String> newComments) {
Map<String,String> oldComments = oldSchema.stream().collect(Collectors.toMap(fieldSchema -> fieldSchema.getName().toLowerCase(Locale.ROOT),
fieldSchema -> StringUtils.isNullOrEmpty(fieldSchema.getComment()) ? "" : fieldSchema.getComment()));
Map<String,String> types = oldSchema.stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
Map<String, ImmutablePair<String,String>> alterComments = new HashMap<>();
oldComments.forEach((name,comment) -> {
String newComment = newComments.getOrDefault(name,"");
if (!newComment.equals(comment)) {
alterComments.put(name,new ImmutablePair<>(types.get(name),newComment));
}
});
if (alterComments.size() > 0) {
ddlExecutor.updateTableComments(tableName, alterComments);
} else {
LOG.info(String.format("No comment difference of %s ",tableName));
}
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.hive.ddl;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.parquet.schema.MessageType;
import java.util.List;
@@ -89,5 +91,13 @@ public interface DDLExecutor {
*/
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);
/**
* update table comments
*
* @param tableName
* @param newSchema
*/
public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> newSchema);
public void close();
}

View File

@@ -18,9 +18,9 @@
package org.apache.hudi.hive.ddl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.PartitionValueExtractor;
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -247,6 +248,27 @@ public class HMSDDLExecutor implements DDLExecutor {
}
}
@Override
public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> alterSchema) {
try {
Table table = client.getTable(syncConfig.databaseName, tableName);
StorageDescriptor sd = new StorageDescriptor(table.getSd());
for (FieldSchema fieldSchema : sd.getCols()) {
if (alterSchema.containsKey(fieldSchema.getName())) {
String comment = alterSchema.get(fieldSchema.getName()).getRight();
fieldSchema.setComment(comment);
}
}
table.setSd(sd);
EnvironmentContext environmentContext = new EnvironmentContext();
client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
sd.clear();
} catch (Exception e) {
LOG.error("Failed to update table comments for " + tableName, e);
throw new HoodieHiveSyncException("Failed to update table comments for " + tableName, e);
}
}
@Override
public void close() {
if (client != null) {

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.PartitionValueExtractor;
@@ -128,6 +129,24 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
}
}
@Override
public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> newSchema) {
for (Map.Entry<String, ImmutablePair<String,String>> field : newSchema.entrySet()) {
String name = field.getKey();
StringBuilder sql = new StringBuilder();
String type = field.getValue().getLeft();
String comment = field.getValue().getRight();
comment = comment.replace("'","");
sql.append("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
.append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER)
.append(" CHANGE COLUMN `").append(name).append("` `").append(name)
.append("` ").append(type).append(" comment '").append(comment).append("' ");
runSQL(sql.toString());
}
}
private List<String> constructAddPartitions(String tableName, List<String> partitions) {
if (config.batchSyncNum <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");

View File

@@ -25,6 +25,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
@@ -33,6 +35,7 @@ import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.Parti
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.Driver;
@@ -52,7 +55,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.hive.testutils.HiveTestUtil.ddlExecutor;
import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem;
@@ -524,6 +529,77 @@ public class TestHiveSyncTool {
"The last commit that was synced should be 101");
}
@ParameterizedTest
@MethodSource("syncMode")
public void testUpdateTableComments(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime, "/simple-test.avsc");
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
HoodieHiveClient hiveClient =
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
Map<String, ImmutablePair<String,String>> alterCommentSchema = new HashMap<>();
//generate commented schema field
Schema schema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, "/simple-test.avsc");
Schema commentedSchema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, "/simple-test-doced.avsc");
Map<String, String> fieldsNameAndDoc = commentedSchema.getFields().stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT),
field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
for (Field field : schema.getFields()) {
String name = field.name().toLowerCase(Locale.ROOT);
String comment = fieldsNameAndDoc.get(name);
if (fieldsNameAndDoc.containsKey(name) && !comment.equals(field.doc())) {
alterCommentSchema.put(name, new ImmutablePair<>(field.schema().getType().name(),comment));
}
}
ddlExecutor.updateTableComments(hiveSyncConfig.tableName,alterCommentSchema);
List<FieldSchema> fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName);
int commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) {
commentCnt++;
}
}
assertEquals(2, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers");
}
@ParameterizedTest
@MethodSource("syncMode")
public void testSyncWithCommentedSchema(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.syncComment = false;
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime, "/simple-test-doced.avsc");
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
HoodieHiveClient hiveClient =
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<FieldSchema> fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName);
int commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) {
commentCnt++;
}
}
assertEquals(0, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers");
hiveSyncConfig.syncComment = true;
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName);
commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) {
commentCnt++;
}
}
assertEquals(2, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers");
}
@ParameterizedTest
@MethodSource("syncModeAndSchemaFromCommitMetadata")
public void testSyncMergeOnRead(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {