diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 5eb8e270a..028fdac1f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -87,9 +87,10 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -124,6 +125,7 @@ public abstract class BaseHoodieWriteClient>> lastCompletedTxnAndMetadata = Option.empty(); + protected Set pendingInflightAndRequestedInstants; /** * Create a write client, with new hudi index. @@ -440,6 +442,8 @@ public abstract class BaseHoodieWriteClient resolveWriteConflictIfAny( - final HoodieTable table, - final Option currentTxnOwnerInstant, - final Option thisCommitMetadata, - final HoodieWriteConfig config, - Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { - return resolveWriteConflictIfAny(table, currentTxnOwnerInstant, thisCommitMetadata, config, lastCompletedTxnOwnerInstant, false); - } - - /** - * Resolve any write conflicts when committing data. + * @param pendingInstants * - * @param table - * @param currentTxnOwnerInstant - * @param thisCommitMetadata - * @param config - * @param lastCompletedTxnOwnerInstant * @return * @throws HoodieWriteConflictException */ @@ -80,11 +65,16 @@ public class TransactionUtils { final Option thisCommitMetadata, final HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant, - boolean reloadActiveTimeline) throws HoodieWriteConflictException { + boolean reloadActiveTimeline, + Set pendingInstants) throws HoodieWriteConflictException { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { + // deal with pendingInstants + Stream completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants); + ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); - Stream instantStream = resolutionStrategy.getCandidateInstants(reloadActiveTimeline - ? table.getMetaClient().reloadActiveTimeline() : table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); + Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants(reloadActiveTimeline + ? table.getMetaClient().reloadActiveTimeline() : table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), + completedInstantsDuringCurrentWriteOperation); final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata())); instantStream.forEach(instant -> { try { @@ -137,4 +127,35 @@ public class TransactionUtils { throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstantOption.get(), io); } } -} \ No newline at end of file + + /** + * Get InflightAndRequest instants. + * + * @param metaClient + * @return + */ + public static Set getInflightAndRequestedInstants(HoodieTableMetaClient metaClient) { + // collect InflightAndRequest instants for deltaCommit/commit/compaction/clustering + Set timelineActions = CollectionUtils + .createImmutableSet(HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMMIT_ACTION); + return metaClient + .getActiveTimeline() + .getTimelineOfActions(timelineActions) + .filterInflightsAndRequested() + .getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + } + + public static Stream getCompletedInstantsDuringCurrentWriteOperation(HoodieTableMetaClient metaClient, Set pendingInstants) { + // deal with pendingInstants + // some pending instants maybe finished during current write operation, + // we should check the conflict of those pending operation + return metaClient + .reloadActiveTimeline() + .getCommitsTimeline() + .filterCompletedInstants() + .getInstants() + .filter(f -> pendingInstants.contains(f.getTimestamp())); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 3a3e04a6b..fb07d3592 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -80,6 +80,7 @@ public abstract class BaseCommitActionExecutor>> lastCompletedTxn; + protected Set pendingInflightAndRequestedInstants; public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, @@ -91,6 +92,8 @@ public abstract class BaseCommitActionExecutor lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant(); + // step3: write 1 starts, which has conflict with C1,C11,C12, named it as C3 + String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime(); + createInflightCommit(currentWriterInstant); + // step4: create a requested commit, which has conflict with C3, named it as C4 + String commitC4 = HoodieActiveTimeline.createNewInstantTime(); + createRequestedCommit(commitC4); + // get PendingCommit during write 1 operation + metaClient.reloadActiveTimeline(); + Set pendingInstant = TransactionUtils.getInflightAndRequestedInstants(metaClient); + pendingInstant.remove(currentWriterInstant); + // step5: finished pending cluster/compaction/commit operation + createCompleteReplace(newInstantTimeC1, WriteOperationType.CLUSTER); + createCompleteCompaction(newCompactionInstantTimeC11); + createCompleteCommit(newCommitInstantTimeC12); + createCompleteCommit(commitC4); + + // step6: do check + Option currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant)); + SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy(); + // make sure c3 has conflict with C1,C11,C12,C4; + HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant, "file-2"); + timeline.reload(); + List completedInstantsDuringCurrentWriteOperation = TransactionUtils + .getCompletedInstantsDuringCurrentWriteOperation(metaClient, pendingInstant).collect(Collectors.toList()); + // C1,C11,C12,C4 should be included + Assertions.assertTrue(completedInstantsDuringCurrentWriteOperation.size() == 4); + + ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); + // check C3 has conflict with C1,C11,C12,C4 + for (HoodieInstant instant : completedInstantsDuringCurrentWriteOperation) { + ConcurrentOperation thatCommitOperation = new ConcurrentOperation(instant, metaClient); + Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + try { + strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + } catch (HoodieWriteConflictException e) { + // expected + } + } + } + + private void createPendingReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + // create replace instant to mark fileId2 as deleted + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name()); + HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan(); + HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup(); + HoodieSliceInfo sliceInfo = new HoodieSliceInfo(); + sliceInfo.setFileId(fileId2); + sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + clusteringGroup.setSlices(Arrays.asList(sliceInfo)); + clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup)); + requestedReplaceMetadata.setClusteringPlan(clusteringPlan); + requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieTestTable.of(metaClient) + .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata), Option.empty()) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + private void createCompleteReplace(String instantTime, WriteOperationType writeOperationType) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + // create replace instant to mark fileId2 as deleted + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + Map> partitionFileIds = new HashMap<>(); + partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, Arrays.asList(fileId2)); + replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + replaceMetadata.setOperationType(writeOperationType); + FileCreateUtils.createReplaceCommit(metaClient.getBasePath(), instantTime, replaceMetadata); + } + + private void createPendingCompaction(String instantTime) throws Exception { + String fileId1 = "file-2"; + HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); + compactionPlan.setVersion(TimelineLayoutVersion.CURR_VERSION); + HoodieCompactionOperation operation = new HoodieCompactionOperation(); + operation.setFileId(fileId1); + operation.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + operation.setDataFilePath("/file-2"); + operation.setDeltaFilePaths(Arrays.asList("/file-2")); + compactionPlan.setOperations(Arrays.asList(operation)); + HoodieTestTable.of(metaClient) + .addRequestedCompaction(instantTime, compactionPlan); + FileCreateUtils.createPendingInflightCompaction(metaClient.getBasePath(), instantTime); + } + + private void createCompleteCompaction(String instantTime) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + commitMetadata.setOperationType(WriteOperationType.COMPACT); + commitMetadata.setCompacted(true); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + private void createRequestedCommit(String instantTime) throws Exception { + HoodieTestTable.of(metaClient) + .addInflightCommit(instantTime); + } + + private void createCompleteCommit(String instantTime) throws Exception { + String fileId1 = "file-1"; + String fileId2 = "file-2"; + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addMetadata("test", "test"); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId("file-2"); + commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat); + commitMetadata.setOperationType(WriteOperationType.INSERT); + HoodieTestTable.of(metaClient) + .addCommit(instantTime, Option.of(commitMetadata)) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 6627eeecb..f6d063223 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -471,7 +471,7 @@ public class SparkRDDWriteClient extends // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), - Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner()); + Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 8f5e5ae96..a403f925a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -274,6 +274,10 @@ public class FileCreateUtils { createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); } + public static void createPendingInflightCompaction(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); + } + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 5f9aab84d..dfc78440d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -257,6 +257,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addPendingReplace(String instantTime, Option requestedReplaceMetadata, Option inflightReplaceMetadata) throws Exception { + createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata); + createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata); + currentInstantTime = instantTime; + return this; + } + public HoodieTestTable addRequestedReplace(String instantTime, Option requestedReplaceMetadata) throws Exception { createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata); currentInstantTime = instantTime;