From 80011df995f6a610a30799eb3b11cdd8a2703630 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Thu, 31 Mar 2022 15:35:39 +0800 Subject: [PATCH] [HUDI-3135] Make delete partitions lazy to be executed by the cleaner (#4489) As of now, delete partitions will ensure all file groups are deleted, but the partition as such is not deleted. So, get all partitions might be returning the deleted partitions as well. but no data will be served since all file groups are deleted. With this patch, we are fixing it. We are letting cleaner take care of deleting the partitions when all file groups pertaining to a partitions are deleted. - Fixed the CleanPlanActionExecutor to return meta info about list of partitions to be deleted. If there are no valid file groups for a partition, clean planner will include the partition to be deleted. - Fixed HoodieCleanPlan avro schema to include the list of partitions to be deleted - CleanActionExecutor is fixed to delete partitions if any (as per clean plan) - Same info is added to HoodieCleanMetadata - Metadata table when applying clean metadata, will check for partitions to be deleted and will update the "all_partitions" record for the deleted partitions. Co-authored-by: sivabalan --- .../HoodieDeletePartitionException.java | 35 +++++++++ .../action/clean/CleanActionExecutor.java | 14 +++- .../action/clean/CleanPlanActionExecutor.java | 14 +++- .../hudi/table/action/clean/CleanPlanner.java | 35 ++++++--- .../utils/TestMetadataConversionUtils.java | 2 +- ...rkDeletePartitionCommitActionExecutor.java | 57 ++++++++++---- .../functional/TestHoodieBackedMetadata.java | 76 ++++++++++++++++--- ...arkCopyOnWriteTableArchiveWithReplace.java | 4 +- .../testutils/HoodieClientTestHarness.java | 2 +- .../avro/HoodieCleanPartitionMetadata.avsc | 3 +- .../src/main/avro/HoodieCleanerPlan.avsc | 8 ++ .../apache/hudi/common/HoodieCleanStat.java | 23 +++++- .../clean/CleanPlanV1MigrationHandler.java | 3 +- .../clean/CleanPlanV2MigrationHandler.java | 3 +- .../apache/hudi/common/util/CleanerUtils.java | 4 +- .../hudi/metadata/HoodieMetadataPayload.java | 26 ++++--- .../metadata/HoodieTableMetadataUtil.java | 27 ++++--- .../common/testutils/HoodieTestTable.java | 4 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 + .../hudi/TestAlterTableDropPartition.scala | 39 +++++++++- 20 files changed, 306 insertions(+), 76 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionException.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionException.java new file mode 100644 index 000000000..34eb734b3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + *

+ * Exception thrown for any higher level errors when doing delete partitions. + *

+ */ +public class HoodieDeletePartitionException extends HoodieException { + + public HoodieDeletePartitionException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieDeletePartitionException(String msg) { + super(msg); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 4ae8009c9..31d26f133 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -76,7 +76,8 @@ public class CleanActionExecutor extends Path deletePath = new Path(deletePathStr); LOG.debug("Working on delete path :" + deletePath); try { - boolean deleteResult = fs.delete(deletePath, false); + boolean isDirectory = fs.isDirectory(deletePath); + boolean deleteResult = fs.delete(deletePath, isDirectory); if (deleteResult) { LOG.debug("Cleaned file at path :" + deletePath); } @@ -137,6 +138,8 @@ public class CleanActionExecutor extends .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); + List partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>(); + Stream> partitionCleanStats = context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition, iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism); @@ -144,6 +147,14 @@ public class CleanActionExecutor extends Map partitionCleanStatsMap = partitionCleanStats .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + partitionsToBeDeleted.forEach(entry -> { + try { + deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry); + } catch (IOException e) { + LOG.warn("Partition deletion failed " + entry); + } + }); + // Return PartitionCleanStat for each partition passed. return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) @@ -162,6 +173,7 @@ public class CleanActionExecutor extends .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) + .isPartitionDeleted(partitionsToBeDeleted.contains(partitionPath)) .build(); }).collect(Collectors.toList()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 86f65cae5..fb2df582b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -108,15 +109,22 @@ public class CleanPlanActionExecutor ext context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned"); - Map> cleanOps = context + Map>> cleanOpsWithPartitionMeta = context .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) .stream() - .collect(Collectors.toMap(Pair::getKey, y -> CleanerUtils.convertToHoodieCleanFileInfoList(y.getValue()))); + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))); + + List partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey) + .collect(Collectors.toList()); return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(), - CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps); + CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete); } catch (IOException e) { throw new HoodieIOException("Failed to schedule clean operation", e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 7e56d3456..3eb0c97e5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -214,7 +214,7 @@ public class CleanPlanner implements Ser * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private List getFilesToCleanKeepingLatestVersions(String partitionPath) { + private Pair> getFilesToCleanKeepingLatestVersions(String partitionPath) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); List deletePaths = new ArrayList<>(); @@ -226,7 +226,7 @@ public class CleanPlanner implements Ser // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely // In other words, the file versions only apply to the active file groups. deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); - + boolean toDeletePartition = false; List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { int keepVersions = config.getCleanerFileVersionsRetained(); @@ -254,10 +254,14 @@ public class CleanPlanner implements Ser deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } } - return deletePaths; + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; + } + return Pair.of(toDeletePartition, deletePaths); } - private List getFilesToCleanKeepingLatestCommits(String partitionPath) { + private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); } @@ -275,7 +279,7 @@ public class CleanPlanner implements Ser *

* This policy is the default. */ - private List getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { + private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); @@ -285,6 +289,7 @@ public class CleanPlanner implements Ser .collect(Collectors.toList()); // determine if we have enough commits, to start cleaning. + boolean toDeletePartition = false; if (commitTimeline.countInstants() > commitsRetained) { Option earliestCommitToRetainOption = getEarliestCommitToRetain(); HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); @@ -350,8 +355,12 @@ public class CleanPlanner implements Ser } } } + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; + } } - return deletePaths; + return Pair.of(toDeletePartition, deletePaths); } /** @@ -362,10 +371,10 @@ public class CleanPlanner implements Ser * @param partitionPath partition path to check * @return list of files to clean */ - private List getFilesToCleanKeepingLatestHours(String partitionPath) { + private Pair> getFilesToCleanKeepingLatestHours(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); } - + private List getReplacedFilesEligibleToClean(List savepointedFiles, String partitionPath, Option earliestCommitToRetain) { final Stream replacedGroups; if (earliestCommitToRetain.isPresent()) { @@ -416,9 +425,9 @@ public class CleanPlanner implements Ser /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public List getDeletePaths(String partitionPath) { + public Pair> getDeletePaths(String partitionPath) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - List deletePaths; + Pair> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { @@ -428,8 +437,10 @@ public class CleanPlanner implements Ser } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); - + LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath); + if (deletePaths.getKey()) { + LOG.info("Partition " + partitionPath + " to be deleted"); + } return deletePaths; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index 415c12a64..986150690 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -259,7 +259,7 @@ public class TestMetadataConversionUtils extends HoodieCommonTestHarness { private void createCleanMetadata(String instantTime) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); HoodieCleanStat cleanStats = new HoodieCleanStat( HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java index b31eb7b96..149aef03e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java @@ -18,24 +18,32 @@ package org.apache.hudi.table.action.commit; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; +import org.apache.hudi.exception.HoodieDeletePartitionException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; public class SparkDeletePartitionCommitActionExecutor> extends SparkInsertOverwriteCommitActionExecutor { @@ -50,16 +58,35 @@ public class SparkDeletePartitionCommitActionExecutor> execute() { - HoodieTimer timer = new HoodieTimer().startTimer(); - context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); - Map> partitionToReplaceFileIds = HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct() - .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); - HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - result.setPartitionToReplaceFileIds(partitionToReplaceFileIds); - result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer())); - result.setWriteStatuses(context.emptyHoodieData()); - this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime); - this.commitOnAutoCommit(result); - return result; + try { + HoodieTimer timer = new HoodieTimer().startTimer(); + context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); + Map> partitionToReplaceFileIds = + HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct() + .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); + HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); + result.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer())); + result.setWriteStatuses(context.emptyHoodieData()); + + // created requested + HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime); + if (!table.getMetaClient().getFs().exists(new Path(table.getMetaClient().getMetaPath(), + dropPartitionsInstant.getFileName()))) { + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.DELETE_PARTITION.name()) + .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap())) + .build(); + table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant, + TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + } + + this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), + instantTime); + this.commitOnAutoCommit(result); + return result; + } catch (Exception e) { + throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 3497a68b9..299fabed7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,7 +18,17 @@ package org.apache.hudi.client.functional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Time; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -32,6 +42,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -89,16 +100,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -1727,6 +1728,61 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } } + @Test + public void testDeletePartitions() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + + int maxCommits = 1; + HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) + .build(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 10); + List upsertRecords = new ArrayList<>(); + for (HoodieRecord entry : records) { + if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + || entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) { + upsertRecords.add(entry); + } + } + List writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // delete partitions + newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); + client.startCommitWithTime(newCommitTime); + client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime); + validateMetadata(client); + + // add 1 more commit + newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); + client.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 10); + upsertRecords = new ArrayList<>(); + for (HoodieRecord entry : records) { + if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) { + upsertRecords.add(entry); + } + } + writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + + // trigger clean which will actually triggger deletion of the partition + newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); + HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime); + validateMetadata(client); + assertEquals(1, metadata(client).getAllPartitionPaths().size()); + } + } + /** * Test various error scenarios. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index acd7e835e..b4d6aefa7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -79,7 +79,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit + // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); @@ -97,7 +97,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie // verify records final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants(); assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), - "should only have the 4 records from the 3rd partition."); + "should only have the 5 records from the 3rd partition."); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 71e4b4b4e..ab3d504fa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -687,7 +687,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmpty) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); if (inflightOnly) { HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan); } else { diff --git a/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc index 877b72591..3cb096d48 100644 --- a/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc @@ -24,6 +24,7 @@ {"name": "policy", "type": "string"}, {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}}, {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, - {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}} + {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}, + {"name": "isPartitionDeleted", "type":["null", "boolean"], "default": null } ] } diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc index c4481c2cd..e4c8638c8 100644 --- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc +++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc @@ -92,6 +92,14 @@ } }}], "default" : null + }, + { + "name": "partitionsToBeDeleted", + "doc": "partitions to be deleted", + "type":["null", + { "type":"array", "items":"string"} + ], + "default": null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java index e9de502f7..fa5d80419 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java @@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable { private final List failedDeleteBootstrapBaseFiles; // Earliest commit that was retained in this clean private final String earliestCommitToRetain; + // set to true if partition is deleted + private final boolean isPartitionDeleted; public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain) { this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain, CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(), - CollectionUtils.createImmutableList()); + CollectionUtils.createImmutableList(), false); } public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain, List deleteBootstrapBasePathPatterns, List successDeleteBootstrapBaseFiles, - List failedDeleteBootstrapBaseFiles) { + List failedDeleteBootstrapBaseFiles, + boolean isPartitionDeleted) { this.policy = policy; this.partitionPath = partitionPath; this.deletePathPatterns = deletePathPatterns; @@ -69,6 +72,7 @@ public class HoodieCleanStat implements Serializable { this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns; this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles; this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles; + this.isPartitionDeleted = isPartitionDeleted; } public HoodieCleaningPolicy getPolicy() { @@ -107,6 +111,10 @@ public class HoodieCleanStat implements Serializable { return earliestCommitToRetain; } + public boolean isPartitionDeleted() { + return isPartitionDeleted; + } + public static HoodieCleanStat.Builder newBuilder() { return new Builder(); } @@ -125,6 +133,7 @@ public class HoodieCleanStat implements Serializable { private List deleteBootstrapBasePathPatterns; private List successDeleteBootstrapBaseFiles; private List failedDeleteBootstrapBaseFiles; + private boolean isPartitionDeleted; public Builder withPolicy(HoodieCleaningPolicy policy) { this.policy = policy; @@ -172,10 +181,15 @@ public class HoodieCleanStat implements Serializable { return this; } + public Builder isPartitionDeleted(boolean isPartitionDeleted) { + this.isPartitionDeleted = isPartitionDeleted; + return this; + } + public HoodieCleanStat build() { return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles, - failedDeleteBootstrapBaseFiles); + failedDeleteBootstrapBaseFiles, isPartitionDeleted); } } @@ -190,7 +204,8 @@ public class HoodieCleanStat implements Serializable { + ", earliestCommitToRetain='" + earliestCommitToRetain + ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns + ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles - + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\'' + + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + + ", isPartitionDeleted=" + isPartitionDeleted + '\'' + '}'; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java index 0010aa21f..66fdfeb62 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.timeline.versioning.clean; +import java.util.ArrayList; import java.util.HashMap; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -61,6 +62,6 @@ public class CleanPlanV1MigrationHandler extends AbstractMigratorBase()); + new HashMap<>(), new ArrayList<>()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java index e141e9a15..fd82109bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hadoop.fs.Path; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,7 +54,7 @@ public class CleanPlanV2MigrationHandler extends AbstractMigratorBase(), VERSION, - filePathsPerPartition); + filePathsPerPartition, new ArrayList<>()); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index a3a130566..df4e9ac40 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -64,13 +64,13 @@ public class CleanerUtils { for (HoodieCleanStat stat : cleanStats) { HoodieCleanPartitionMetadata metadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), - stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); + stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), stat.isPartitionDeleted()); partitionMetadataMap.put(stat.getPartitionPath(), metadata); if ((null != stat.getDeleteBootstrapBasePathPatterns()) && (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) { HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(), - stat.getFailedDeleteBootstrapBaseFiles()); + stat.getFailedDeleteBootstrapBaseFiles(), stat.isPartitionDeleted()); partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata); } totalDeleted += stat.getSuccessDeleteFiles().size(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 2c4623b2b..4b9a185d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -18,6 +18,13 @@ package org.apache.hudi.metadata; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; @@ -35,14 +42,6 @@ import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -222,8 +221,17 @@ public class HoodieMetadataPayload implements HoodieRecordPayload createPartitionListRecord(List partitions) { + return createPartitionListRecord(partitions, false); + } + + /** + * Create and return a {@code HoodieMetadataPayload} to save list of partitions. + * + * @param partitions The list of partitions + */ + public static HoodieRecord createPartitionListRecord(List partitions, boolean isDeleted) { Map fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, false))); + partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted))); HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 9e1db9eba..5eafac20e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,6 +18,11 @@ package org.apache.hudi.metadata; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -53,17 +58,10 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -162,10 +160,10 @@ public class HoodieTableMetadataUtil { String instantTime) { List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); - // Add record bearing partitions list - ArrayList partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + // Add record bearing added partitions list + ArrayList partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); - records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList)); + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); // Update files listing records for each individual partition List> updatedPartitionFilesRecords = @@ -318,6 +316,7 @@ public class HoodieTableMetadataUtil { String instantTime) { List records = new LinkedList<>(); int[] fileDeleteCount = {0}; + List deletedPartitions = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> { final String partition = getPartition(partitionName); // Files deleted from a partition @@ -327,8 +326,16 @@ public class HoodieTableMetadataUtil { records.add(record); fileDeleteCount[0] += deletedFiles.size(); + boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted(); + if (isPartitionDeleted) { + deletedPartitions.add(partitionName); + } }); + if (!deletedPartitions.isEmpty()) { + // if there are partitions to be deleted, add them to delete list + records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true)); + } LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() + ", #files_deleted=" + fileDeleteCount[0]); return records; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index dfc78440d..fa6998bfa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -291,7 +291,7 @@ public class HoodieTestTable { public HoodieTestTable addClean(String instantTime) throws IOException { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); HoodieCleanStat cleanStats = new HoodieCleanStat( HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], @@ -305,7 +305,7 @@ public class HoodieTestTable { public Pair getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) { HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); List cleanStats = new ArrayList<>(); for (Map.Entry> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) { cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f9da19236..a7c5056e2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -45,8 +45,11 @@ import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager +import org.apache.spark.SPARK_VERSION import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index d545e3dad..4eec95aba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -47,6 +47,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")( s"$tableName is a non-partitioned table that is not allowed to drop partition") + + // show partitions + checkAnswer(s"show partitions $tableName")(Seq.empty: _*) } test("Purge drop non-partitioned table") { @@ -71,6 +74,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")( s"$tableName is a non-partitioned table that is not allowed to drop partition") + + // show partitions + checkAnswer(s"show partitions $tableName")(Seq.empty: _*) } Seq(false, true).foreach { urlencode => @@ -113,6 +119,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { } checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + + // show partitions + if (urlencode) { + checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) + } else { + checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) + } } } } @@ -157,6 +170,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { } checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + + // show partitions + if (urlencode) { + checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02"))) + } else { + checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) + } } } } @@ -189,7 +209,10 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')") - checkAnswer(s"select id, name, ts, dt from $tableName") (Seq(2, "l4", "v1", "2021-10-02")) + checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02")) + + // show partitions + checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) } Seq(false, true).foreach { hiveStyle => @@ -232,6 +255,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") ) + + // show partitions + if (hiveStyle) { + checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) + } else { + checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) + } } } } @@ -274,6 +304,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { ) assertResult(false)(existsPath( s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01")) + + // show partitions + if (hiveStyle) { + checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02")) + } else { + checkAnswer(s"show partitions $tableName")(Seq("2021/10/02")) + } } } }