[HUDI-2923] Fixing metadata table reader when metadata compaction is inflight (#4206)
* [HUDI-2923] Fixing metadata table reader when metadata compaction is inflight * Fixing retry of pending compaction in metadata table and enhancing tests
This commit is contained in:
committed by
GitHub
parent
94f45e928c
commit
1d4fb827e7
@@ -519,6 +519,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run any pending compactions.
|
||||
*/
|
||||
public void runAnyPendingCompactions() {
|
||||
runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a savepoint based on the latest commit action on the timeline.
|
||||
*
|
||||
|
||||
@@ -682,7 +682,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
* deltacommit.
|
||||
*/
|
||||
protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) {
|
||||
String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
|
||||
// finish off any pending compactions if any from previous attempt.
|
||||
writeClient.runAnyPendingCompactions();
|
||||
|
||||
String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
|
||||
.get().getTimestamp();
|
||||
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
|
||||
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
|
||||
@@ -693,6 +696,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
|
||||
// delta commits synced over will not have an instant time lesser than the last completed instant on the
|
||||
// metadata table.
|
||||
|
||||
@@ -154,7 +154,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
||||
* The record is tagged with respective file slice's location based on its record key.
|
||||
*/
|
||||
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
|
||||
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
|
||||
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
|
||||
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
|
||||
|
||||
return records.stream().map(r -> {
|
||||
|
||||
@@ -169,7 +169,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
||||
* The record is tagged with respective file slice's location based on its record key.
|
||||
*/
|
||||
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
|
||||
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
|
||||
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
|
||||
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
|
||||
|
||||
return recordsRDD.map(r -> {
|
||||
|
||||
@@ -54,6 +54,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
@@ -413,6 +414,91 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests that virtual key configs are honored in base files after compaction in metadata table.
|
||||
*
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompaction) throws Exception {
|
||||
HoodieTableType tableType = COPY_ON_WRITE;
|
||||
init(tableType, false);
|
||||
writeConfig = getWriteConfigBuilder(true, true, false)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
||||
.enable(true)
|
||||
.enableFullScan(true)
|
||||
.enableMetrics(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(3)
|
||||
.build()).build();
|
||||
initWriteConfigAndMetatableWriter(writeConfig, true);
|
||||
|
||||
doWriteOperation(testTable, "0000001", INSERT);
|
||||
// create an inflight compaction in metadata table.
|
||||
// not easy to create an inflight in metadata table directly, hence letting compaction succeed and then deleting the completed instant.
|
||||
// this new write is expected to trigger metadata table compaction
|
||||
String commitInstant = "0000002";
|
||||
doWriteOperation(testTable, commitInstant, INSERT);
|
||||
|
||||
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
|
||||
String metadataCompactionInstant = commitInstant + "001";
|
||||
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
|
||||
assertEquals(tableMetadata.getLatestCompactionTime().get(), metadataCompactionInstant);
|
||||
|
||||
validateMetadata(testTable);
|
||||
// Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets
|
||||
// for future upserts. so, renaming the file here to some temp name and later renaming it back to same name.
|
||||
java.nio.file.Path parentPath = Paths.get(metadataTableBasePath, HoodieTableMetaClient.METAFOLDER_NAME);
|
||||
java.nio.file.Path metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION);
|
||||
java.nio.file.Path tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant);
|
||||
metaClient.reloadActiveTimeline();
|
||||
testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
|
||||
// this validation will exercise the code path where a compaction is inflight in metadata table, but still metadata based file listing should match non
|
||||
// metadata based file listing.
|
||||
validateMetadata(testTable);
|
||||
|
||||
if (simulateFailedCompaction) {
|
||||
// this should retry the compaction in metadata table.
|
||||
doWriteOperation(testTable, "0000003", INSERT);
|
||||
} else {
|
||||
// let the compaction succeed in metadata and validation should succeed.
|
||||
FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath);
|
||||
}
|
||||
|
||||
validateMetadata(testTable);
|
||||
|
||||
// add few more write and validate
|
||||
doWriteOperation(testTable, "0000004", INSERT);
|
||||
doWriteOperation(testTable, "0000005", UPSERT);
|
||||
validateMetadata(testTable);
|
||||
|
||||
if (simulateFailedCompaction) {
|
||||
//trigger another compaction failure.
|
||||
metadataCompactionInstant = "0000005001";
|
||||
tableMetadata = metadata(writeConfig, context);
|
||||
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
|
||||
assertEquals(tableMetadata.getLatestCompactionTime().get(), metadataCompactionInstant);
|
||||
|
||||
// Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets
|
||||
// for future upserts. so, renaming the file here to some temp name and later renaming it back to same name.
|
||||
parentPath = Paths.get(metadataTableBasePath, HoodieTableMetaClient.METAFOLDER_NAME);
|
||||
metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION);
|
||||
tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant);
|
||||
|
||||
validateMetadata(testTable);
|
||||
|
||||
// this should retry the failed compaction in metadata table.
|
||||
doWriteOperation(testTable, "0000006", INSERT);
|
||||
|
||||
validateMetadata(testTable);
|
||||
|
||||
// add few more write and validate
|
||||
doWriteOperation(testTable, "0000007", INSERT);
|
||||
doWriteOperation(testTable, "0000008", UPSERT);
|
||||
validateMetadata(testTable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test rollback of various table operations sync to Metadata Table correctly.
|
||||
*/
|
||||
|
||||
@@ -245,7 +245,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
||||
|
||||
// Metadata is in sync till the latest completed instant on the dataset
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
|
||||
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true);
|
||||
if (latestFileSlices.size() == 0) {
|
||||
// empty partition
|
||||
return Pair.of(null, null);
|
||||
|
||||
@@ -50,6 +50,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
|
||||
|
||||
@@ -338,9 +339,10 @@ public class HoodieTableMetadataUtil {
|
||||
* The list of file slices returned is sorted in the correct order of file group name.
|
||||
* @param metaClient instance of {@link HoodieTableMetaClient}.
|
||||
* @param partition The name of the partition whose file groups are to be loaded.
|
||||
* @param isReader true if reader code path, false otherwise.
|
||||
* @return List of latest file slices for all file groups in a given partition.
|
||||
*/
|
||||
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
|
||||
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) {
|
||||
LOG.info("Loading file groups for metadata table partition " + partition);
|
||||
|
||||
// If there are no commits on the metadata table then the table's default FileSystemView will not return any file
|
||||
@@ -352,7 +354,9 @@ public class HoodieTableMetadataUtil {
|
||||
}
|
||||
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
|
||||
return fsView.getLatestFileSlices(partition).sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
|
||||
Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) :
|
||||
fsView.getLatestFileSlices(partition);
|
||||
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,6 +344,16 @@ public class FileCreateUtils {
|
||||
removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION);
|
||||
}
|
||||
|
||||
public static java.nio.file.Path renameFileToTemp(java.nio.file.Path sourcePath, String instantTime) throws IOException {
|
||||
java.nio.file.Path dummyFilePath = sourcePath.getParent().resolve(instantTime + ".temp");
|
||||
Files.move(sourcePath, dummyFilePath);
|
||||
return dummyFilePath;
|
||||
}
|
||||
|
||||
public static void renameTempToMetaFile(java.nio.file.Path tempFilePath, java.nio.file.Path destPath) throws IOException {
|
||||
Files.move(tempFilePath, destPath);
|
||||
}
|
||||
|
||||
public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException {
|
||||
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
|
||||
if (Files.notExists(parentPath)) {
|
||||
|
||||
Reference in New Issue
Block a user