1
0

[HUDI-2990] Sync to HMS when deleting partitions (#4291)

This commit is contained in:
ForwardXu
2021-12-13 20:40:06 +08:00
committed by GitHub
parent b22c2c611b
commit dd96129191
11 changed files with 243 additions and 35 deletions

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -166,20 +165,28 @@ public class HiveSyncTool extends AbstractSyncTool {
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// check if isDropPartition
boolean isDropPartition = hoodieHiveClient.isDropPartition();
// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
// check if schemaChanged
boolean schemaChanged = false;
if (!isDropPartition) {
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
}
// Sync schema if needed
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
@@ -192,7 +199,7 @@ public class HiveSyncTool extends AbstractSyncTool {
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
@@ -331,19 +338,32 @@ public class HiveSyncTool extends AbstractSyncTool {
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
if (!newPartitions.isEmpty()) {
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
}
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty();
if (!updatePartitions.isEmpty()) {
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
}
List<String> dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP);
if (!dropPartitions.isEmpty()) {
LOG.info("Drop Partitions " + dropPartitions);
hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions);
}
partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}

View File

@@ -122,6 +122,14 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
ddlExecutor.updatePartitionsToTable(tableName, changedPartitions);
}
/**
* Partition path has changed - drop the following partitions.
*/
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop);
}
/**
* Update the table properties to the table.
*/
@@ -147,6 +155,14 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
return getPartitionEvents(tablePartitions, partitionStoragePartitions, false);
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
@@ -161,12 +177,17 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
if (isDropPartition) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
}

View File

@@ -81,5 +81,13 @@ public interface DDLExecutor {
*/
public void updatePartitionsToTable(String tableName, List<String> changedPartitions);
/**
* Drop partitions for a given table.
*
* @param tableName
* @param partitionsToDrop
*/
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);
public void close();
}

View File

@@ -226,6 +226,25 @@ public class HMSDDLExecutor implements DDLExecutor {
}
}
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
LOG.info("No partitions to drop for " + tableName);
return;
}
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
try {
for (String dropPartition : partitionsToDrop) {
client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false);
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (TException e) {
LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
}
}
@Override
public void close() {
if (client != null) {

View File

@@ -126,6 +126,25 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
}
}
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
LOG.info("No partitions to drop for " + tableName);
return;
}
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
try {
for (String dropPartition : partitionsToDrop) {
metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false);
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (Exception e) {
LOG.error(config.databaseName + "." + tableName + " drop partition failed", e);
throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e);
}
}
@Override
public void close() {
if (metaStoreClient != null) {

View File

@@ -32,6 +32,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -141,6 +142,13 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
}
}
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
partitionsToDrop.stream()
.map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition))
.forEach(this::runSQL);
}
@Override
public void close() {
try {

View File

@@ -737,6 +737,56 @@ public class TestHiveSyncTool {
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}
@ParameterizedTest
@MethodSource("syncMode")
public void testDropPartitionKeySync(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 1, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
// session, then lead to connection retry, we can see there is a exception at log.
hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Adding of new partitions
List<String> newPartition = Arrays.asList("2050/01/01");
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"No new partition should be added");
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition);
assertEquals(2, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"New partition should be added");
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// Drop 1 partition.
ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName
+ "` DROP PARTITION (`datestr`='2050-01-01')");
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
assertEquals(1, hivePartitions.size(),
"Table should have 1 partition because of the drop 1 partition");
}
@ParameterizedTest
@MethodSource("syncMode")
public void testNonPartitionedSync(String syncMode) throws Exception {