From a048e940fd6e3f62e443bca5831e99144900a33f Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 1 Apr 2022 06:59:17 +0530 Subject: [PATCH] [HUDI-3743] Support DELETE_PARTITION for metadata table (#5169) In order to drop any metadata partition (index), we can reuse the DELETE_PARTITION operation in metadata table. Subsequent to this, we can support drop index (with table config update) for async metadata indexer. - Add a new API in HoodieTableMetadataWriter - Current only supported for Spark metadata writer --- .../metadata/HoodieTableMetadataWriter.java | 8 +++ .../action/clean/CleanActionExecutor.java | 7 +- .../hudi/table/action/clean/CleanPlanner.java | 19 +++-- .../FlinkHoodieBackedTableMetadataWriter.java | 6 ++ .../SparkHoodieBackedTableMetadataWriter.java | 16 +++++ .../functional/TestHoodieBackedMetadata.java | 70 ++++++++++++++++++- .../hudi/metadata/HoodieMetadataPayload.java | 32 +++++++-- .../metadata/HoodieTableMetadataUtil.java | 35 +++++++--- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 - .../hudi/TestAlterTableDropPartition.scala | 4 +- 10 files changed, 170 insertions(+), 30 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 777c785e2..83fe18672 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -94,4 +94,12 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * @param instantTime instant time of the commit. */ void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); + + /** + * Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation. + * + * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline + * @param partitions - list of {@link MetadataPartitionType} to drop + */ + void deletePartitions(String instantTime, List partitions); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 31d26f133..f2d6411ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -72,7 +72,7 @@ public class CleanActionExecutor extends this.skipLocking = skipLocking; } - static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { + private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { Path deletePath = new Path(deletePathStr); LOG.debug("Working on delete path :" + deletePath); try { @@ -88,7 +88,7 @@ public class CleanActionExecutor extends } } - static Stream> deleteFilesFunc(Iterator> cleanFileInfo, HoodieTable table) { + private static Stream> deleteFilesFunc(Iterator> cleanFileInfo, HoodieTable table) { Map partitionCleanStatMap = new HashMap<>(); FileSystem fs = table.getMetaClient().getFs(); @@ -138,8 +138,6 @@ public class CleanActionExecutor extends .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))); - List partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>(); - Stream> partitionCleanStats = context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition, iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism); @@ -147,6 +145,7 @@ public class CleanActionExecutor extends Map partitionCleanStatsMap = partitionCleanStats .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + List partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>(); partitionsToBeDeleted.forEach(entry -> { try { deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 3eb0c97e5..79eef43b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -21,7 +21,6 @@ package org.apache.hudi.table.action.clean; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -45,7 +44,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieSavepointException; +import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -202,11 +203,18 @@ public class CleanPlanner implements Ser /** * Scan and list all partitions for cleaning. * @return all partitions paths for the dataset. - * @throws IOException */ private List getPartitionPathsForFullCleaning() { // Go to brute force mode of scanning all partitions - return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath()); + try { + // Because the partition of BaseTableMetadata has been deleted, + // all partition information can only be obtained from FileSystemBackedTableMetadata. + FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context, + context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning()); + return fsBackedTableMetadata.getAllPartitionPaths(); + } catch (IOException e) { + return Collections.emptyList(); + } } /** @@ -278,6 +286,9 @@ public class CleanPlanner implements Ser * retain 10 commits, and commit batch time is 30 mins, then you have 5 hrs of lookback) *

* This policy is the default. + * + * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, + * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); @@ -466,7 +477,7 @@ public class CleanPlanner implements Ser /** * Determine if file slice needed to be preserved for pending compaction. - * + * * @param fileSlice File Slice * @return true if file slice needs to be preserved, false otherwise. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 4d64e5834..76774e961 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.conf.Configuration; @@ -161,4 +162,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // Update total size of the metadata and count of base/log files metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata)); } + + @Override + public void deletePartitions(String instantTime, List partitions) { + throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet."); + } } \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index d0173f984..7d94b2d4f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -25,8 +25,11 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -43,6 +46,7 @@ import org.apache.spark.api.java.JavaRDD; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { @@ -177,4 +181,16 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // Update total size of the metadata and count of base/log files metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata)); } + + @Override + public void deletePartitions(String instantTime, List partitions) { + List partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); + LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop); + + try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { + String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ); + writeClient.startCommitWithTime(instantTime, actionType); + writeClient.deletePartitions(partitionsToDrop, instantTime); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 299fabed7..f60de7dfd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -446,6 +446,73 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { testTableOperationsImpl(engineContext, writeConfig); } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException { + initPath(); + int maxCommits = 1; + HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) + .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) + .build(); + init(tableType); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) { + // Write 1 (Bulk insert) + String newCommitTime = "0000001"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // Write 2 (upserts) + newCommitTime = "0000002"; + client.startCommitWithTime(newCommitTime); + validateMetadata(client); + + records = dataGen.generateInserts(newCommitTime, 10); + writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + + // metadata writer to delete column_stats partition + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS)); + + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false); + // partition should be physically deleted + assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size()); + assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())); + + Option completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant(); + assertTrue(completedReplaceInstant.isPresent()); + assertEquals("0000003", completedReplaceInstant.get().getTimestamp()); + + final Map metadataEnabledPartitionTypes = new HashMap<>(); + metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e)); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + metadataTablePartitions.forEach(partition -> { + List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); + if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) { + // there should not be any file slice in column_stats partition + assertTrue(latestSlices.isEmpty()); + } else { + assertFalse(latestSlices.isEmpty()); + assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() + <= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest base file per file group"); + assertTrue(latestSlices.size() + <= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest file slice per file group"); + } + }); + } + } + /** * Tests that virtual key configs are honored in base files after compaction in metadata table. * @@ -1760,7 +1827,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); client.startCommitWithTime(newCommitTime); client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime); - validateMetadata(client); // add 1 more commit newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); @@ -1775,7 +1841,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); - // trigger clean which will actually triggger deletion of the partition + // trigger clean which will actually trigger deletion of the partition newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000); HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime); validateMetadata(client); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 4b9a185d2..0f4599724 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -18,13 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataFileInfo; @@ -42,6 +35,14 @@ import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -239,6 +240,23 @@ public class HoodieMetadataPayload implements HoodieRecordPayload(key, payload); } + /** + * Create and return a {@code HoodieMetadataPayload} to save list of partitions. + * + * @param partitionsAdded The list of added partitions + * @param partitionsDeleted The list of deleted partitions + */ + public static HoodieRecord createPartitionListRecord(List partitionsAdded, List partitionsDeleted) { + Map fileInfo = new HashMap<>(); + partitionsAdded.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitionsDeleted.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, true))); + + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); + return new HoodieAvroRecord<>(key, payload); + } + /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index bb4aaca8a..9e3eca3eb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -18,11 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -38,7 +33,9 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDeltaWriteStat; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -58,10 +55,17 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.Nonnull; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -195,9 +199,12 @@ public class HoodieTableMetadataUtil { List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); // Add record bearing added partitions list - ArrayList partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + List partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); - records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); + // Add record bearing deleted partitions list + List partitionsDeleted = getPartitionsDeleted(commitMetadata); + + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted)); // Update files listing records for each individual partition List> updatedPartitionFilesRecords = @@ -247,6 +254,18 @@ public class HoodieTableMetadataUtil { return records; } + private static ArrayList getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { + if (commitMetadata instanceof HoodieReplaceCommitMetadata + && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { + Map> partitionToReplaceFileIds = + ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); + if (!partitionToReplaceFileIds.isEmpty()) { + return new ArrayList<>(partitionToReplaceFileIds.keySet()); + } + } + return new ArrayList<>(); + } + /** * Convert commit action metadata to bloom filter records. * @@ -371,7 +390,7 @@ public class HoodieTableMetadataUtil { records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true)); } LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size()); return records; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 01ca8e928..033b2618a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -45,11 +45,8 @@ import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager -import org.apache.spark.SPARK_VERSION import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 4eec95aba..fdff6928a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -222,7 +222,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) .toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi") @@ -273,7 +273,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) .toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi")