[HUDI-3355] Issue with out of order commits in the timeline when ingestion writers using SparkAllowUpdateStrategy (#4962)
This commit is contained in:
@@ -23,6 +23,7 @@ import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.hudi.avro.model.HoodieClusteringGroup;
|
||||
@@ -31,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieSliceInfo;
|
||||
import org.apache.hudi.client.utils.TransactionUtils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
@@ -40,6 +42,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
@@ -318,16 +321,20 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
private HoodieCommitMetadata createCommitMetadata(String instantTime) {
|
||||
private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) {
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
commitMetadata.addMetadata("test", "test");
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setFileId("file-1");
|
||||
writeStat.setFileId(writeFileName);
|
||||
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
|
||||
commitMetadata.setOperationType(WriteOperationType.INSERT);
|
||||
return commitMetadata;
|
||||
}
|
||||
|
||||
private HoodieCommitMetadata createCommitMetadata(String instantTime) {
|
||||
return createCommitMetadata(instantTime, "file-1");
|
||||
}
|
||||
|
||||
private void createInflightCommit(String instantTime) throws Exception {
|
||||
String fileId1 = "file-" + instantTime + "-1";
|
||||
String fileId2 = "file-" + instantTime + "-2";
|
||||
@@ -417,4 +424,147 @@ public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
// try to simulate HUDI-3355
|
||||
@Test
|
||||
public void testConcurrentWritesWithPendingInstants() throws Exception {
|
||||
// step1: create a pending replace/commit/compact instant: C1,C11,C12
|
||||
String newInstantTimeC1 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createPendingReplace(newInstantTimeC1, WriteOperationType.CLUSTER);
|
||||
|
||||
String newCompactionInstantTimeC11 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createPendingCompaction(newCompactionInstantTimeC11);
|
||||
|
||||
String newCommitInstantTimeC12 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createInflightCommit(newCommitInstantTimeC12);
|
||||
// step2: create a complete commit which has no conflict with C1,C11,C12, named it as C2
|
||||
createCommit(HoodieActiveTimeline.createNewInstantTime());
|
||||
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
||||
// consider commits before this are all successful
|
||||
Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
|
||||
// step3: write 1 starts, which has conflict with C1,C11,C12, named it as C3
|
||||
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
|
||||
createInflightCommit(currentWriterInstant);
|
||||
// step4: create a requested commit, which has conflict with C3, named it as C4
|
||||
String commitC4 = HoodieActiveTimeline.createNewInstantTime();
|
||||
createRequestedCommit(commitC4);
|
||||
// get PendingCommit during write 1 operation
|
||||
metaClient.reloadActiveTimeline();
|
||||
Set<String> pendingInstant = TransactionUtils.getInflightAndRequestedInstants(metaClient);
|
||||
pendingInstant.remove(currentWriterInstant);
|
||||
// step5: finished pending cluster/compaction/commit operation
|
||||
createCompleteReplace(newInstantTimeC1, WriteOperationType.CLUSTER);
|
||||
createCompleteCompaction(newCompactionInstantTimeC11);
|
||||
createCompleteCommit(newCommitInstantTimeC12);
|
||||
createCompleteCommit(commitC4);
|
||||
|
||||
// step6: do check
|
||||
Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
|
||||
SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
|
||||
// make sure c3 has conflict with C1,C11,C12,C4;
|
||||
HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, "file-2");
|
||||
timeline.reload();
|
||||
List<HoodieInstant> completedInstantsDuringCurrentWriteOperation = TransactionUtils
|
||||
.getCompletedInstantsDuringCurrentWriteOperation(metaClient, pendingInstant).collect(Collectors.toList());
|
||||
// C1,C11,C12,C4 should be included
|
||||
Assertions.assertTrue(completedInstantsDuringCurrentWriteOperation.size() == 4);
|
||||
|
||||
ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
|
||||
// check C3 has conflict with C1,C11,C12,C4
|
||||
for (HoodieInstant instant : completedInstantsDuringCurrentWriteOperation) {
|
||||
ConcurrentOperation thatCommitOperation = new ConcurrentOperation(instant, metaClient);
|
||||
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
|
||||
try {
|
||||
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
|
||||
} catch (HoodieWriteConflictException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createPendingReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
|
||||
String fileId1 = "file-1";
|
||||
String fileId2 = "file-2";
|
||||
// create replace instant to mark fileId2 as deleted
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
|
||||
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
|
||||
HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
|
||||
HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
|
||||
HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
|
||||
sliceInfo.setFileId(fileId2);
|
||||
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||
clusteringGroup.setSlices(Arrays.asList(sliceInfo));
|
||||
clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
|
||||
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
|
||||
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), Option.empty())
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
private void createCompleteReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
|
||||
String fileId1 = "file-1";
|
||||
String fileId2 = "file-2";
|
||||
|
||||
// create replace instant to mark fileId2 as deleted
|
||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||
Map<String, List<String>> partitionFileIds = new HashMap<>();
|
||||
partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2));
|
||||
replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds);
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setFileId("file-2");
|
||||
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
|
||||
replaceMetadata.setOperationType(writeOperationType);
|
||||
FileCreateUtils.createReplaceCommit(metaClient.getBasePath(), instantTime, replaceMetadata);
|
||||
}
|
||||
|
||||
private void createPendingCompaction(String instantTime) throws Exception {
|
||||
String fileId1 = "file-2";
|
||||
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
|
||||
compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION);
|
||||
HoodieCompactionOperation operation = new HoodieCompactionOperation();
|
||||
operation.setFileId(fileId1);
|
||||
operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||
operation.setDataFilePath("/file-2");
|
||||
operation.setDeltaFilePaths(Arrays.asList("/file-2"));
|
||||
compactionPlan.setOperations(Arrays.asList(operation));
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addRequestedCompaction(instantTime, compactionPlan);
|
||||
FileCreateUtils.createPendingInflightCompaction(metaClient.getBasePath(), instantTime);
|
||||
}
|
||||
|
||||
private void createCompleteCompaction(String instantTime) throws Exception {
|
||||
String fileId1 = "file-1";
|
||||
String fileId2 = "file-2";
|
||||
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
commitMetadata.addMetadata("test", "test");
|
||||
commitMetadata.setOperationType(WriteOperationType.COMPACT);
|
||||
commitMetadata.setCompacted(true);
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setFileId("file-2");
|
||||
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addCommit(instantTime, Option.of(commitMetadata))
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
|
||||
private void createRequestedCommit(String instantTime) throws Exception {
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addInflightCommit(instantTime);
|
||||
}
|
||||
|
||||
private void createCompleteCommit(String instantTime) throws Exception {
|
||||
String fileId1 = "file-1";
|
||||
String fileId2 = "file-2";
|
||||
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
commitMetadata.addMetadata("test", "test");
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setFileId("file-2");
|
||||
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
|
||||
commitMetadata.setOperationType(WriteOperationType.INSERT);
|
||||
HoodieTestTable.of(metaClient)
|
||||
.addCommit(instantTime, Option.of(commitMetadata))
|
||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user