From d1d48ed4947b81e10fba83b7a462839a55ff9115 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Sat, 18 Dec 2021 06:43:17 -0800 Subject: [PATCH] [HUDI-3029] Transaction manager: avoid deadlock when doing begin and end transactions (#4363) * [HUDI-3029] Transaction manager: avoid deadlock when doing begin and end transactions - Transaction manager has begin and end transactions as synchronized methods. Based on the lock provider implementaion, this can lead to deadlock situation when the underlying lock() calls are blocking or with a long timeout. - Fixing transaction manager begin and end transactions to not get to deadlock and to not assume anything on the lock provider implementation. --- .../client/AbstractHoodieWriteClient.java | 9 +- .../transaction/TransactionManager.java | 61 ++++-- .../lock/InProcessLockProvider.java | 4 +- .../client/transaction/lock/LockManager.java | 22 -- .../action/clean/CleanActionExecutor.java | 2 +- .../commit/BaseCommitActionExecutor.java | 6 +- .../restore/BaseRestoreActionExecutor.java | 2 +- .../rollback/BaseRollbackActionExecutor.java | 2 +- .../transaction/TestTransactionManager.java | 198 ++++++++++++++++++ .../hudi/client/HoodieFlinkWriteClient.java | 4 +- .../hudi/client/SparkRDDWriteClient.java | 8 +- 11 files changed, 256 insertions(+), 62 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3fe28c64b..293c1d94d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -201,7 +201,7 @@ public abstract class AbstractHoodieWriteClient scheduleTableService(String instantTime, Option> extraMetadata, TableServiceType tableServiceType) { // A lock is required to guard against race conditions between an on-going writer and scheduling a table service. + final Option inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, + tableServiceType.getAction(), instantTime)); try { - this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, - tableServiceType.getAction(), instantTime)), Option.empty()); + this.txnManager.beginTransaction(inflightInstant, Option.empty()); LOG.info("Scheduling table service " + tableServiceType); return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(inflightInstant); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index a6753aaa3..3a3552e74 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -35,49 +35,64 @@ import java.io.Serializable; public class TransactionManager implements Serializable { private static final Logger LOG = LogManager.getLogger(TransactionManager.class); - private final LockManager lockManager; - private Option currentTxnOwnerInstant; - private Option lastCompletedTxnOwnerInstant; - private boolean supportsOptimisticConcurrency; + private final boolean isOptimisticConcurrencyControlEnabled; + private Option currentTxnOwnerInstant = Option.empty(); + private Option lastCompletedTxnOwnerInstant = Option.empty(); public TransactionManager(HoodieWriteConfig config, FileSystem fs) { this.lockManager = new LockManager(config, fs); - this.supportsOptimisticConcurrency = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); } - public synchronized void beginTransaction() { - if (supportsOptimisticConcurrency) { + public void beginTransaction() { + if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction starting without a transaction owner"); lockManager.lock(); - LOG.info("Transaction started"); + LOG.info("Transaction started without a transaction owner"); } } - public synchronized void beginTransaction(Option currentTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { - if (supportsOptimisticConcurrency) { - this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; - lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant); - LOG.info("Latest completed transaction instant " + lastCompletedTxnOwnerInstant); - this.currentTxnOwnerInstant = currentTxnOwnerInstant; - LOG.info("Transaction starting with transaction owner " + currentTxnOwnerInstant); + public void beginTransaction(Option newTxnOwnerInstant, + Option lastCompletedTxnOwnerInstant) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction starting for " + newTxnOwnerInstant + + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant); lockManager.lock(); - LOG.info("Transaction started"); + reset(currentTxnOwnerInstant, newTxnOwnerInstant, lastCompletedTxnOwnerInstant); + LOG.info("Transaction started for " + newTxnOwnerInstant + + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant); } } - public synchronized void endTransaction() { - if (supportsOptimisticConcurrency) { - LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); + public void endTransaction() { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction ending without a transaction owner"); lockManager.unlock(); - LOG.info("Transaction ended"); - this.lastCompletedTxnOwnerInstant = Option.empty(); - lockManager.resetLatestCompletedWriteInstant(); + LOG.info("Transaction ended without a transaction owner"); + } + } + + public void endTransaction(Option currentTxnOwnerInstant) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); + reset(currentTxnOwnerInstant, Option.empty(), Option.empty()); + lockManager.unlock(); + LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + } + } + + private synchronized void reset(Option callerInstant, + Option newTxnOwnerInstant, + Option lastCompletedTxnOwnerInstant) { + if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant == callerInstant) { + this.currentTxnOwnerInstant = newTxnOwnerInstant; + this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; } } public void close() { - if (supportsOptimisticConcurrency) { + if (isOptimisticConcurrencyControlEnabled) { lockManager.close(); LOG.info("Transaction manager closed"); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index b151879d6..cab9d95df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -111,7 +111,7 @@ public class InProcessLockProvider implements LockProvider> latestCompletedWriteInstant; public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { - this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty()); this.writeConfig = writeConfig; this.hadoopConf = new SerializableConfiguration(fs.getConf()); this.lockConfiguration = new LockConfiguration(writeConfig.getProps()); @@ -100,22 +94,6 @@ public class LockManager implements Serializable, AutoCloseable { return lockProvider; } - public void setLatestCompletedWriteInstant(Option instant) { - this.latestCompletedWriteInstant.set(instant); - } - - public void compareAndSetLatestCompletedWriteInstant(Option expected, Option newValue) { - this.latestCompletedWriteInstant.compareAndSet(expected, newValue); - } - - public AtomicReference> getLatestCompletedWriteInstant() { - return latestCompletedWriteInstant; - } - - public void resetLatestCompletedWriteInstant() { - this.latestCompletedWriteInstant.set(Option.empty()); - } - @Override public void close() { closeQuietly(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 173010e86..f1f2e53ec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -217,7 +217,7 @@ public class CleanActionExecutor extends throw new HoodieIOException("Failed to clean up after commit", e); } finally { if (!skipLocking) { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ce6ed5db3..684db0317 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -147,14 +147,16 @@ public abstract class BaseCommitActionExecutor> extraMetadata, HoodieWriteMetadata result) { - this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)), + final Option inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT, + HoodieTimeline.COMMIT_ACTION, instantTime)); + this.txnManager.beginTransaction(inflightInstant, lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty()); try { TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner()); commit(extraMetadata, result); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(inflightInstant); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java index ac8f9940d..937134014 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java @@ -112,7 +112,7 @@ public abstract class BaseRestoreActionExecutor { + transactionManager.beginTransaction(); + }); + + transactionManager.endTransaction(); + assertThrows(HoodieLockException.class, () -> { + transactionManager.endTransaction(); + }); + } + + @Test + public void testMultiWriterTransactions() { + final int threadCount = 3; + final long awaitMaxTimeoutMs = 2000L; + final CountDownLatch latch = new CountDownLatch(threadCount); + final AtomicBoolean writer1Completed = new AtomicBoolean(false); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Let writer1 get the lock first, then wait for others + // to join the sync up point. + Thread writer1 = new Thread(() -> { + assertDoesNotThrow(() -> { + transactionManager.beginTransaction(); + }); + latch.countDown(); + try { + latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS); + // Following sleep is to make sure writer2 attempts + // to try lock and to get blocked on the lock which + // this thread is currently holding. + Thread.sleep(50); + } catch (InterruptedException e) { + // + } + assertDoesNotThrow(() -> { + transactionManager.endTransaction(); + }); + writer1Completed.set(true); + }); + writer1.start(); + + // Writer2 will block on trying to acquire the lock + // and will eventually get the lock before the timeout. + Thread writer2 = new Thread(() -> { + latch.countDown(); + try { + latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // + } + assertDoesNotThrow(() -> { + transactionManager.beginTransaction(); + }); + assertDoesNotThrow(() -> { + transactionManager.endTransaction(); + }); + writer2Completed.set(true); + }); + writer2.start(); + + // Let writer1 and writer2 wait at the sync up + // point to make sure they run in parallel and + // one get blocked by the other. + latch.countDown(); + try { + writer1.join(); + writer2.join(); + } catch (InterruptedException e) { + // + } + + // Make sure both writers actually completed good + Assertions.assertTrue(writer1Completed.get()); + Assertions.assertTrue(writer2Completed.get()); + } + + @Test + public void testTransactionsWithInstantTime() { + // 1. Begin and end by the same transaction owner + Option lastCompletedInstant = getInstant("0000001"); + Option newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner + lastCompletedInstant = getInstant("0000002"); + newTxnOwnerInstant = getInstant("0000003"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + transactionManager.endTransaction(); + // Owner reset would not happen as the end txn was invoked with an incorrect current txn owner + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + + // 3. But, we should be able to begin a new transaction for a new owner + lastCompletedInstant = getInstant("0000003"); + newTxnOwnerInstant = getInstant("0000004"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + // 4. Transactions with no owners should also go through + transactionManager.beginTransaction(); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + transactionManager.endTransaction(); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + } + + private Option getInstant(String timestamp) { + return Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, timestamp)); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 374dd1226..2ed2536c2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -359,8 +359,8 @@ public class HoodieFlinkWriteClient extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { - HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. @@ -371,7 +371,7 @@ public class HoodieFlinkWriteClient extends LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(compactionInstant)); } if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index dcd241618..9b2aad3eb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -300,8 +300,8 @@ public class SparkRDDWriteClient extends String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); + final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { - HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. @@ -309,7 +309,7 @@ public class SparkRDDWriteClient extends LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(compactionInstant)); } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); @@ -378,8 +378,8 @@ public class SparkRDDWriteClient extends throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } + final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); try { - HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); writeTableMetadataForTableServices(table, metadata,clusteringInstant); @@ -395,7 +395,7 @@ public class SparkRDDWriteClient extends } catch (Exception e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(clusteringInstant)); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());