1
0

[HUDI-1740] Fix insert-overwrite API archival (#2784)

- fix problem of archiving replace commits
- Fix problem when getting empty replacecommit.requested
- Improved the logic of handling empty and non-empty requested/inflight commit files. Added unit tests to cover both empty and non-empty inflight files cases and cleaned up some unused test util methods

Co-authored-by: yorkzero831 <yorkzero8312@gmail.com>
Co-authored-by: zheren.yu <zheren.yu@paypay-corp.co.jp>
This commit is contained in:
Susu Dong
2021-05-22 05:52:13 +09:00
committed by GitHub
parent 99b14a78e3
commit 685f77b5dd
12 changed files with 169 additions and 107 deletions

View File

@@ -112,6 +112,14 @@
"HoodieRequestedReplaceMetadata"
],
"default": null
},
{
"name":"HoodieInflightReplaceMetadata",
"type":[
"null",
"HoodieCommitMetadata"
],
"default": null
}
]
}

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
@@ -158,12 +159,20 @@ public class FileCreateUtils {
createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
}
public static void createRequestedReplaceCommit(String basePath, String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION, serializeRequestedReplaceMetadata(requestedReplaceMetadata).get());
public static void createRequestedReplaceCommit(String basePath, String instantTime, Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata) throws IOException {
if (requestedReplaceMetadata.isPresent()) {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION, serializeRequestedReplaceMetadata(requestedReplaceMetadata.get()).get());
} else {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION);
}
}
public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION);
public static void createInflightReplaceCommit(String basePath, String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws IOException {
if (inflightReplaceMetadata.isPresent()) {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION, inflightReplaceMetadata.get().toJsonString().getBytes(StandardCharsets.UTF_8));
} else {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION);
}
}
public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException {
@@ -199,10 +208,6 @@ public class FileCreateUtils {
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}
public static void createInflightCompaction(String basePath, String instantTime) throws IOException {
createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
}
public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);

View File

@@ -65,7 +65,6 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
@@ -128,13 +127,6 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
@@ -143,14 +135,6 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
@@ -178,16 +162,20 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable addReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception {
public HoodieTestTable addReplaceCommit(
String instantTime,
Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata,
Option<HoodieCommitMetadata> inflightReplaceMetadata,
HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata);
createInflightReplaceCommit(basePath, instantTime);
createReplaceCommit(basePath, instantTime, metadata);
createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
createReplaceCommit(basePath, instantTime, completeReplaceMetadata);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addRequestedReplace(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata) throws Exception {
public HoodieTestTable addRequestedReplace(String instantTime, Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata) throws Exception {
createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -247,14 +235,6 @@ public class HoodieTestTable {
return addRequestedCompaction(instantTime, plan);
}
public HoodieTestTable addCompaction(String instantTime) throws IOException {
createRequestedCompaction(basePath, instantTime);
createInflightCompaction(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable forCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
@@ -270,11 +250,6 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable forCompaction(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException {
for (String partitionPath : partitionPaths) {
FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
@@ -282,10 +257,6 @@ public class HoodieTestTable {
return this;
}
public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException {
return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
}
public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
return this;
@@ -357,10 +328,6 @@ public class HoodieTestTable {
return this;
}
public boolean inflightCommitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::inflightCommitExists);
}
public boolean inflightCommitExists(String instantTime) {
try {
return fs.exists(getInflightCommitFilePath(instantTime));
@@ -369,10 +336,6 @@ public class HoodieTestTable {
}
}
public boolean commitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::commitExists);
}
public boolean commitExists(String instantTime) {
try {
return fs.exists(getCommitFilePath(instantTime));
@@ -389,10 +352,6 @@ public class HoodieTestTable {
});
}
public boolean baseFilesExist(String partition, String instantTime, String... fileIds) {
return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, instantTime, f));
}
public boolean baseFileExists(String partition, String instantTime, String fileId) {
try {
return fs.exists(new Path(Paths.get(basePath, partition, baseFileName(instantTime, fileId)).toString()));