From e00a9042e9a4e97f6f8ccaed4ac51a9aa8c2da76 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 24 Jan 2022 05:43:28 -0500 Subject: [PATCH] [HUDI-3072] Fixing conflict resolution in transaction management code path for auto commit code path (#4588) * Fixing conflict resolution in transaction management code path for auto commit code path * Addressing comments * Fixing test failures --- .../hudi/client/utils/TransactionUtils.java | 24 ++- .../commit/BaseCommitActionExecutor.java | 9 +- .../commit/BaseFlinkCommitActionExecutor.java | 9 +- .../commit/BaseJavaCommitActionExecutor.java | 8 +- .../SparkBootstrapCommitActionExecutor.java | 5 + .../commit/BaseSparkCommitActionExecutor.java | 10 +- .../client/TestHoodieClientMultiWriter.java | 161 ++++++++++++++++-- .../functional/TestHoodieBackedMetadata.java | 3 +- 8 files changed, 205 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index ed2ea4577..9d7683128 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -60,9 +60,31 @@ public class TransactionUtils { final Option thisCommitMetadata, final HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { + return resolveWriteConflictIfAny(table, currentTxnOwnerInstant, thisCommitMetadata, config, lastCompletedTxnOwnerInstant, false); + } + + /** + * Resolve any write conflicts when committing data. + * + * @param table + * @param currentTxnOwnerInstant + * @param thisCommitMetadata + * @param config + * @param lastCompletedTxnOwnerInstant + * @return + * @throws HoodieWriteConflictException + */ + public static Option resolveWriteConflictIfAny( + final HoodieTable table, + final Option currentTxnOwnerInstant, + final Option thisCommitMetadata, + final HoodieWriteConfig config, + Option lastCompletedTxnOwnerInstant, + boolean reloadActiveTimeline) throws HoodieWriteConflictException { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); - Stream instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); + Stream instantStream = resolutionStrategy.getCandidateInstants(reloadActiveTimeline + ? table.getMetaClient().reloadActiveTimeline() : table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata())); instantStream.forEach(instant -> { try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 7449f3f80..432aaf126 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -152,18 +151,22 @@ public abstract class BaseCommitActionExecutor> extraMetadata, HoodieWriteMetadata result) { final Option inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT, - HoodieTimeline.COMMIT_ACTION, instantTime)); + getCommitActionType(), instantTime)); this.txnManager.beginTransaction(inflightInstant, lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty()); try { + setCommitMetadata(result); + // reload active timeline so as to get all updates after current transaction have started. hence setting last arg to true. TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), - result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner()); + result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner(), true); commit(extraMetadata, result); } finally { this.txnManager.endTransaction(inflightInstant); } } + protected abstract void setCommitMetadata(HoodieWriteMetadata result); + protected abstract void commit(Option> extraMetadata, HoodieWriteMetadata result); /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 5dfa511a8..51138cd29 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -134,6 +134,12 @@ public abstract class BaseFlinkCommitActionExecutor> result) { + result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()), + result.getPartitionToReplaceFileIds(), + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()))); + } + protected void commit(Option> extraMetadata, HoodieWriteMetadata> result, List writeStats) { String actionType = getCommitActionType(); LOG.info("Committing " + instantTime + ", action Type " + actionType); @@ -144,8 +150,7 @@ public abstract class BaseFlinkCommitActionExecutor> result) { + result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()), + result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()))); + } + protected void commit(Option> extraMetadata, HoodieWriteMetadata> result, List writeStats) { String actionType = getCommitActionType(); LOG.info("Committing " + instantTime + ", action Type " + actionType); @@ -206,8 +211,7 @@ public abstract class BaseJavaCommitActionExecutor return null; } + @Override + protected void setCommitMetadata(HoodieWriteMetadata> result) { + result.setCommitMetadata(Option.of(new HoodieCommitMetadata())); + } + @Override protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { // Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index c551310ba..e6c9b5f98 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -273,6 +273,13 @@ public abstract class BaseSparkCommitActionExecutor> result) { + result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collect(), + result.getPartitionToReplaceFileIds(), + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()))); + } + @Override protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); @@ -288,8 +295,7 @@ public abstract class BaseSparkCommitActionExecutor { - createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords); + createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords, true); validInstants.add("007"); }); }); @@ -395,7 +399,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) @@ -411,7 +415,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { .build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); + createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true); // Start another inflight commit String newCommitTime = "003"; int numRecords = 100; @@ -441,6 +445,134 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } } + @Test + public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception { + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); + HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .build()).withAutoCommit(true).withProperties(properties); + HoodieWriteConfig cfg = writeConfigBuilder.build(); + HoodieWriteConfig cfg2 = writeConfigBuilder.build(); + + // Create the first commit + createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false); + // Start another inflight commit + String newCommitTime1 = "003"; + String newCommitTime2 = "004"; + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2); + + List updates1 = dataGen.generateUpdates(newCommitTime1, 5000); + List updates2 = dataGen.generateUpdates(newCommitTime2, 5000); + + JavaRDD writeRecords1 = jsc.parallelize(updates1, 4); + JavaRDD writeRecords2 = jsc.parallelize(updates2, 4); + + runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::upsert, true); + } + + private void runConcurrentAndAssert(JavaRDD writeRecords1, JavaRDD writeRecords2, + SparkRDDWriteClient client1, SparkRDDWriteClient client2, + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + boolean assertForConflict) throws ExecutionException, InterruptedException { + + CountDownLatch runCountDownLatch = new CountDownLatch(2); + final ExecutorService executors = Executors.newFixedThreadPool(2); + String newCommitTime1 = "003"; + String newCommitTime2 = "004"; + + AtomicBoolean client1Succeeded = new AtomicBoolean(true); + AtomicBoolean client2Succeeded = new AtomicBoolean(true); + + Future future1 = executors.submit(() -> { + try { + ingestBatch(writeFn, client1, newCommitTime1, writeRecords1, runCountDownLatch); + } catch (IOException e) { + LOG.error("IOException thrown " + e.getMessage()); + } catch (InterruptedException e) { + LOG.error("Interrupted Exception thrown " + e.getMessage()); + } catch (Exception e) { + client1Succeeded.set(false); + } + } + ); + + Future future2 = executors.submit(() -> { + try { + ingestBatch(writeFn, client2, newCommitTime2, writeRecords2, runCountDownLatch); + } catch (IOException e) { + LOG.error("IOException thrown " + e.getMessage()); + } catch (InterruptedException e) { + LOG.error("Interrupted Exception thrown " + e.getMessage()); + } catch (Exception e) { + client2Succeeded.set(false); + } + } + ); + + future1.get(); + future2.get(); + if (assertForConflict) { + assertFalse(client1Succeeded.get() && client2Succeeded.get()); + assertTrue(client1Succeeded.get() || client2Succeeded.get()); + } else { + assertTrue(client2Succeeded.get() && client1Succeeded.get()); + } + } + + @Test + public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception { + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); + HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .build()).withAutoCommit(true).withProperties(properties); + HoodieWriteConfig cfg = writeConfigBuilder.build(); + HoodieWriteConfig cfg2 = writeConfigBuilder.build(); + + // Create the first commit + createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false); + // Start another inflight commit + String newCommitTime1 = "003"; + String newCommitTime2 = "004"; + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2); + + List updates1 = dataGen.generateInserts(newCommitTime1, 200); + List updates2 = dataGen.generateInserts(newCommitTime2, 200); + + JavaRDD writeRecords1 = jsc.parallelize(updates1, 1); + JavaRDD writeRecords2 = jsc.parallelize(updates2, 1); + + runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::bulkInsert, false); + } + + private void ingestBatch(Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + SparkRDDWriteClient writeClient, String commitTime, JavaRDD records, + CountDownLatch countDownLatch) throws IOException, InterruptedException { + writeClient.startCommitWithTime(commitTime); + countDownLatch.countDown(); + countDownLatch.await(); + JavaRDD statusJavaRDD = writeFn.apply(writeClient, records, commitTime); + statusJavaRDD.collect(); + } + private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommitTime, String newCommitTime, int numRecords, String partition) throws Exception { @@ -450,11 +582,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, - String prevCommitTime, String newCommitTime, int numRecords) throws Exception { + String prevCommitTime, String newCommitTime, int numRecords, + boolean doCommit) throws Exception { // Finish first base commmit JavaRDD result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert, false, false, numRecords); - assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); + if (doCommit) { + assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); + } } private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e3db3914a..cd56601d0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -892,7 +892,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"1000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20"); HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())