1
0

[HUDI-3743] Support DELETE_PARTITION for metadata table (#5169)

In order to drop any metadata partition (index), we can reuse the DELETE_PARTITION operation in metadata table. Subsequent to this, we can support drop index (with table config update) for async metadata indexer.

- Add a new API in HoodieTableMetadataWriter
- Current only supported for Spark metadata writer
This commit is contained in:
Sagar Sumit
2022-04-01 06:59:17 +05:30
committed by GitHub
parent 28dafa774e
commit a048e940fd
10 changed files with 170 additions and 30 deletions

View File

@@ -94,4 +94,12 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* @param instantTime instant time of the commit.
*/
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
/**
* Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation.
*
* @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline
* @param partitions - list of {@link MetadataPartitionType} to drop
*/
void deletePartitions(String instantTime, List<MetadataPartitionType> partitions);
}

View File

@@ -72,7 +72,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
this.skipLocking = skipLocking;
}
static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Path deletePath = new Path(deletePathStr);
LOG.debug("Working on delete path :" + deletePath);
try {
@@ -88,7 +88,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
}
}
static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Pair<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
@@ -138,8 +138,6 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);
@@ -147,6 +145,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
partitionsToBeDeleted.forEach(entry -> {
try {
deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry);

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
@@ -45,7 +44,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -202,11 +203,18 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
/**
* Scan and list all partitions for cleaning.
* @return all partitions paths for the dataset.
* @throws IOException
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context,
context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning());
return fsBackedTableMetadata.getAllPartitionPaths();
} catch (IOException e) {
return Collections.emptyList();
}
}
/**
@@ -278,6 +286,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
* retain 10 commits, and commit batch time is 30 mins, then you have 5 hrs of lookback)
* <p>
* This policy is the default.
*
* @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
* and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
*/
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
@@ -466,7 +477,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
/**
* Determine if file slice needed to be preserved for pending compaction.
*
*
* @param fileSlice File Slice
* @return true if file slice needs to be preserved, false otherwise.
*/

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
@@ -161,4 +162,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}
@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
}
}

View File

@@ -25,8 +25,11 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,6 +46,7 @@ import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
@@ -177,4 +181,16 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}
@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ);
writeClient.startCommitWithTime(instantTime, actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
}
}

View File

@@ -446,6 +446,73 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
testTableOperationsImpl(engineContext, writeConfig);
}
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataTableDeletePartition(HoodieTableType tableType) throws IOException {
initPath();
int maxCommits = 1;
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, cfg)) {
// Write 1 (Bulk insert)
String newCommitTime = "0000001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Write 2 (upserts)
newCommitTime = "0000002";
client.startCommitWithTime(newCommitTime);
validateMetadata(client);
records = dataGen.generateInserts(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// metadata writer to delete column_stats partition
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS));
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, metadataMetaClient.getBasePath(), false, false);
// partition should be physically deleted
assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
assertFalse(metadataTablePartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
Option<HoodieInstant> completedReplaceInstant = metadataMetaClient.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
assertTrue(completedReplaceInstant.isPresent());
assertEquals("0000003", completedReplaceInstant.get().getTimestamp());
final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = new HashMap<>();
metadataWriter.getEnabledPartitionTypes().forEach(e -> metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
// there should not be any file slice in column_stats partition
assertTrue(latestSlices.isEmpty());
} else {
assertFalse(latestSlices.isEmpty());
assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest base file per file group");
assertTrue(latestSlices.size()
<= metadataEnabledPartitionTypes.get(partition).getFileGroupCount(), "Should have a single latest file slice per file group");
}
});
}
}
/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
@@ -1760,7 +1827,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
client.startCommitWithTime(newCommitTime);
client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime);
validateMetadata(client);
// add 1 more commit
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
@@ -1775,7 +1841,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// trigger clean which will actually triggger deletion of the partition
// trigger clean which will actually trigger deletion of the partition
newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime);
validateMetadata(client);