[HUDI-3107]Fix HiveSyncTool drop partitions using JDBC or hivesql or hms (#4453)
* constructDropPartitions when drop partitions using jdbc * done * done * code style * code review Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.StorageSchemes;
|
|||||||
import org.apache.hudi.hive.HiveSyncConfig;
|
import org.apache.hudi.hive.HiveSyncConfig;
|
||||||
import org.apache.hudi.hive.HoodieHiveSyncException;
|
import org.apache.hudi.hive.HoodieHiveSyncException;
|
||||||
import org.apache.hudi.hive.PartitionValueExtractor;
|
import org.apache.hudi.hive.PartitionValueExtractor;
|
||||||
|
import org.apache.hudi.hive.util.HivePartitionUtil;
|
||||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@@ -236,7 +237,8 @@ public class HMSDDLExecutor implements DDLExecutor {
|
|||||||
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
|
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
|
||||||
try {
|
try {
|
||||||
for (String dropPartition : partitionsToDrop) {
|
for (String dropPartition : partitionsToDrop) {
|
||||||
client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false);
|
String partitionClause = HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
|
||||||
|
client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false);
|
||||||
LOG.info("Drop partition " + dropPartition + " on " + tableName);
|
LOG.info("Drop partition " + dropPartition + " on " + tableName);
|
||||||
}
|
}
|
||||||
} catch (TException e) {
|
} catch (TException e) {
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
|
|||||||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
|
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
|
||||||
import org.apache.hadoop.hive.ql.session.SessionState;
|
import org.apache.hadoop.hive.ql.session.SessionState;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hudi.hive.util.HivePartitionUtil;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -136,7 +137,8 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
|
|||||||
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
|
LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
|
||||||
try {
|
try {
|
||||||
for (String dropPartition : partitionsToDrop) {
|
for (String dropPartition : partitionsToDrop) {
|
||||||
metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false);
|
String partitionClause = HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
|
||||||
|
metaStoreClient.dropPartition(config.databaseName, tableName, partitionClause, false);
|
||||||
LOG.info("Drop partition " + dropPartition + " on " + tableName);
|
LOG.info("Drop partition " + dropPartition + " on " + tableName);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hive.ddl;
|
package org.apache.hudi.hive.ddl;
|
||||||
|
|
||||||
|
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
|
||||||
|
|
||||||
import org.apache.hudi.hive.HiveSyncConfig;
|
import org.apache.hudi.hive.HiveSyncConfig;
|
||||||
import org.apache.hudi.hive.HoodieHiveSyncException;
|
import org.apache.hudi.hive.HoodieHiveSyncException;
|
||||||
|
|
||||||
@@ -31,6 +33,7 @@ import java.sql.DriverManager;
|
|||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -144,9 +147,49 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
|
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
|
||||||
partitionsToDrop.stream()
|
if (partitionsToDrop.isEmpty()) {
|
||||||
.map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition))
|
LOG.info("No partitions to add for " + tableName);
|
||||||
.forEach(this::runSQL);
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Adding partitions " + partitionsToDrop.size() + " to table " + tableName);
|
||||||
|
List<String> sqls = constructDropPartitions(tableName, partitionsToDrop);
|
||||||
|
sqls.stream().forEach(sql -> runSQL(sql));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> constructDropPartitions(String tableName, List<String> partitions) {
|
||||||
|
if (config.batchSyncNum <= 0) {
|
||||||
|
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
|
||||||
|
}
|
||||||
|
List<String> result = new ArrayList<>();
|
||||||
|
int batchSyncPartitionNum = config.batchSyncNum;
|
||||||
|
StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
|
||||||
|
|
||||||
|
for (int i = 0; i < partitions.size(); i++) {
|
||||||
|
String partitionClause = getPartitionClause(partitions.get(i));
|
||||||
|
if (i == 0) {
|
||||||
|
alterSQL.append(" PARTITION (").append(partitionClause).append(")");
|
||||||
|
} else {
|
||||||
|
alterSQL.append(", PARTITION (").append(partitionClause).append(")");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((i + 1) % batchSyncPartitionNum == 0) {
|
||||||
|
result.add(alterSQL.toString());
|
||||||
|
alterSQL = getAlterTableDropPrefix(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// add left partitions to result
|
||||||
|
if (partitions.size() % batchSyncPartitionNum != 0) {
|
||||||
|
result.add(alterSQL.toString());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StringBuilder getAlterTableDropPrefix(String tableName) {
|
||||||
|
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
|
||||||
|
alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
|
||||||
|
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
|
||||||
|
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF EXISTS ");
|
||||||
|
return alterSQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -159,4 +202,4 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
|
|||||||
LOG.error("Could not close connection ", e);
|
LOG.error("Could not close connection ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -46,7 +46,7 @@ import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
|
|||||||
public abstract class QueryBasedDDLExecutor implements DDLExecutor {
|
public abstract class QueryBasedDDLExecutor implements DDLExecutor {
|
||||||
private static final Logger LOG = LogManager.getLogger(QueryBasedDDLExecutor.class);
|
private static final Logger LOG = LogManager.getLogger(QueryBasedDDLExecutor.class);
|
||||||
private final HiveSyncConfig config;
|
private final HiveSyncConfig config;
|
||||||
private final PartitionValueExtractor partitionValueExtractor;
|
public final PartitionValueExtractor partitionValueExtractor;
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
|
||||||
public QueryBasedDDLExecutor(HiveSyncConfig config, FileSystem fs) {
|
public QueryBasedDDLExecutor(HiveSyncConfig config, FileSystem fs) {
|
||||||
@@ -160,7 +160,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
|
|||||||
return alterSQL;
|
return alterSQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getPartitionClause(String partition) {
|
public String getPartitionClause(String partition) {
|
||||||
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
|
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
|
||||||
ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
|
ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
|
||||||
"Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
|
"Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.hive.util;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.hive.HiveSyncConfig;
|
||||||
|
import org.apache.hudi.hive.PartitionValueExtractor;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class HivePartitionUtil {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build String, example as year=2021/month=06/day=25
|
||||||
|
*/
|
||||||
|
public static String getPartitionClauseForDrop(String partition, PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) {
|
||||||
|
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
|
||||||
|
ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
|
||||||
|
"Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
|
||||||
|
+ ". Check partition strategy. ");
|
||||||
|
List<String> partBuilder = new ArrayList<>();
|
||||||
|
for (int i = 0; i < config.partitionFields.size(); i++) {
|
||||||
|
String partitionValue = partitionValues.get(i);
|
||||||
|
// decode the partition before sync to hive to prevent multiple escapes of HIVE
|
||||||
|
if (config.decodePartition) {
|
||||||
|
// This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
|
||||||
|
partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
|
||||||
|
}
|
||||||
|
partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue);
|
||||||
|
}
|
||||||
|
return String.join("/", partBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.hive;
|
|||||||
|
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.testutils.NetworkTestUtils;
|
import org.apache.hudi.common.testutils.NetworkTestUtils;
|
||||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
@@ -787,6 +788,47 @@ public class TestHiveSyncTool {
|
|||||||
"Table should have 1 partition because of the drop 1 partition");
|
"Table should have 1 partition because of the drop 1 partition");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("syncMode")
|
||||||
|
public void testDropPartition(String syncMode) throws Exception {
|
||||||
|
hiveSyncConfig.syncMode = syncMode;
|
||||||
|
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||||
|
String instantTime = "100";
|
||||||
|
HiveTestUtil.createCOWTable(instantTime, 1, true);
|
||||||
|
|
||||||
|
HoodieHiveClient hiveClient =
|
||||||
|
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
|
||||||
|
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
|
||||||
|
"Table " + hiveSyncConfig.tableName + " should not exist initially");
|
||||||
|
// Lets do the sync
|
||||||
|
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
|
||||||
|
tool.syncHoodieTable();
|
||||||
|
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
|
||||||
|
// session, then lead to connection retry, we can see there is a exception at log.
|
||||||
|
hiveClient =
|
||||||
|
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||||
|
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||||
|
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
||||||
|
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||||
|
hiveClient.getDataSchema().getColumns().size() + 1,
|
||||||
|
"Hive Schema should match the table schema + partition field");
|
||||||
|
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
|
||||||
|
"Table partitions should match the number of partitions we wrote");
|
||||||
|
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
|
||||||
|
"The last commit that was synced should be updated in the TBLPROPERTIES");
|
||||||
|
// create a replace commit to delete current partitions
|
||||||
|
HiveTestUtil.createReplaceCommit("101", "2021/12/28", WriteOperationType.DELETE_PARTITION);
|
||||||
|
|
||||||
|
// sync drop partitins
|
||||||
|
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
|
||||||
|
tool.syncHoodieTable();
|
||||||
|
|
||||||
|
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
|
||||||
|
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
|
||||||
|
assertEquals(0, hivePartitions.size(),
|
||||||
|
"Table should have 0 partition because of the drop the only one partition");
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("syncMode")
|
@MethodSource("syncMode")
|
||||||
public void testNonPartitionedSync(String syncMode) throws Exception {
|
public void testNonPartitionedSync(String syncMode) throws Exception {
|
||||||
|
|||||||
@@ -28,8 +28,10 @@ import org.apache.hudi.common.model.HoodieBaseFile;
|
|||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
|
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
|
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
|
||||||
@@ -176,6 +178,16 @@ public class HiveTestUtil {
|
|||||||
createCommitFile(commitMetadata, instantTime);
|
createCommitFile(commitMetadata, instantTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type)
|
||||||
|
throws IOException {
|
||||||
|
HoodieReplaceCommitMetadata replaceCommitMetadata = new HoodieReplaceCommitMetadata();
|
||||||
|
replaceCommitMetadata.setOperationType(type);
|
||||||
|
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
|
||||||
|
partitionToReplaceFileIds.put(partitions, new ArrayList<>());
|
||||||
|
replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
|
||||||
|
createReplaceCommitFile(replaceCommitMetadata, instantTime);
|
||||||
|
}
|
||||||
|
|
||||||
public static void createCOWTableWithSchema(String instantTime, String schemaFileName)
|
public static void createCOWTableWithSchema(String instantTime, String schemaFileName)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
Path path = new Path(hiveSyncConfig.basePath);
|
Path path = new Path(hiveSyncConfig.basePath);
|
||||||
@@ -442,6 +454,15 @@ public class HiveTestUtil {
|
|||||||
fsout.close();
|
fsout.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
|
||||||
|
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
|
||||||
|
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||||
|
+ HoodieTimeline.makeReplaceFileName(instantTime));
|
||||||
|
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
|
||||||
|
fsout.write(bytes);
|
||||||
|
fsout.close();
|
||||||
|
}
|
||||||
|
|
||||||
public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
|
public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
|
||||||
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
|
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
|
||||||
createCommitFile(commitMetadata, instantTime);
|
createCommitFile(commitMetadata, instantTime);
|
||||||
|
|||||||
Reference in New Issue
Block a user