[HUDI-3180] Include files from completed commits while bootstrapping metadata table (#4519)
This commit is contained in:
committed by
GitHub
parent
bc95571caa
commit
7a8b94c82d
@@ -746,9 +746,16 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
|
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
|
||||||
if (!partitionInfoList.isEmpty()) {
|
if (!partitionInfoList.isEmpty()) {
|
||||||
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
|
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
|
||||||
|
Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
|
||||||
|
// filter for files that are part of the completed commits
|
||||||
|
Map<String, Long> validFileNameToSizeMap = fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
|
||||||
|
String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
|
||||||
|
return HoodieTimeline.compareTimestamps(commitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
|
||||||
|
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
|
||||||
// Record which saves files within a partition
|
// Record which saves files within a partition
|
||||||
return HoodieMetadataPayload.createPartitionFilesRecord(
|
return HoodieMetadataPayload.createPartitionFilesRecord(
|
||||||
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
|
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap), Option.empty());
|
||||||
});
|
});
|
||||||
partitionRecords = partitionRecords.union(fileListRecords);
|
partitionRecords = partitionRecords.union(fileListRecords);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
|
|||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
|
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||||
@@ -36,7 +37,10 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.EnumSource;
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
|
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
@@ -76,6 +80,36 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase {
|
|||||||
bootstrapAndVerify();
|
bootstrapAndVerify();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate that bootstrap considers only files part of completed commit and ignore any extra files.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMetadataBootstrapWithExtraFiles() throws Exception {
|
||||||
|
HoodieTableType tableType = COPY_ON_WRITE;
|
||||||
|
init(tableType, false);
|
||||||
|
doPreBootstrapWriteOperation(testTable, INSERT, "0000001");
|
||||||
|
doPreBootstrapWriteOperation(testTable, "0000002");
|
||||||
|
doPreBootstrapClean(testTable, "0000003", Arrays.asList("0000001"));
|
||||||
|
doPreBootstrapWriteOperation(testTable, "0000005");
|
||||||
|
// add few extra files to table. bootstrap should include those files.
|
||||||
|
String fileName = UUID.randomUUID().toString();
|
||||||
|
Path baseFilePath = FileCreateUtils.getBaseFilePath(basePath, "p1", "0000006", fileName);
|
||||||
|
FileCreateUtils.createBaseFile(basePath, "p1", "0000006", fileName, 100);
|
||||||
|
|
||||||
|
writeConfig = getWriteConfig(true, true);
|
||||||
|
initWriteConfigAndMetatableWriter(writeConfig, true);
|
||||||
|
syncTableMetadata(writeConfig);
|
||||||
|
|
||||||
|
// remove those files from table. and then validate.
|
||||||
|
Files.delete(baseFilePath);
|
||||||
|
|
||||||
|
// validate
|
||||||
|
validateMetadata(testTable);
|
||||||
|
// after bootstrap do two writes and validate its still functional.
|
||||||
|
doWriteInsertAndUpsert(testTable);
|
||||||
|
validateMetadata(testTable);
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(HoodieTableType.class)
|
@EnumSource(HoodieTableType.class)
|
||||||
public void testMetadataBootstrapInsertUpsertRollback(HoodieTableType tableType) throws Exception {
|
public void testMetadataBootstrapInsertUpsertRollback(HoodieTableType tableType) throws Exception {
|
||||||
|
|||||||
@@ -304,6 +304,11 @@ public class FileCreateUtils {
|
|||||||
Files.setLastModifiedTime(baseFilePath, FileTime.fromMillis(lastModificationTimeMilli));
|
Files.setLastModifiedTime(baseFilePath, FileTime.fromMillis(lastModificationTimeMilli));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Path getBaseFilePath(String basePath, String partitionPath, String instantTime, String fileId) {
|
||||||
|
Path parentPath = Paths.get(basePath, partitionPath);
|
||||||
|
return parentPath.resolve(baseFileName(instantTime, fileId));
|
||||||
|
}
|
||||||
|
|
||||||
public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version)
|
public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
createLogFile(basePath, partitionPath, instantTime, fileId, version, 0);
|
createLogFile(basePath, partitionPath, instantTime, fileId, version, 0);
|
||||||
|
|||||||
Reference in New Issue
Block a user