[HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
This commit is contained in:
committed by
GitHub
parent
63a099c5b7
commit
2245a9515f
@@ -795,7 +795,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
Timer.Context timerContext = metrics.getRollbackCtx();
|
Timer.Context timerContext = metrics.getRollbackCtx();
|
||||||
try {
|
try {
|
||||||
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
|
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
|
||||||
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
|
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
|
||||||
if (restorePlanOption.isPresent()) {
|
if (restorePlanOption.isPresent()) {
|
||||||
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
|
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
|
||||||
@@ -1035,7 +1035,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
|
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
|
||||||
HoodieTable table = createTable(config, hadoopConf);
|
HoodieTable table = createTable(config, hadoopConf);
|
||||||
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
|
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
|
||||||
this.txnManager.beginTransaction();
|
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
|
||||||
|
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
|
||||||
try {
|
try {
|
||||||
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
|
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
|
||||||
table.getMetadataWriter(dropInstant).ifPresent(w -> {
|
table.getMetadataWriter(dropInstant).ifPresent(w -> {
|
||||||
@@ -1046,7 +1047,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
} finally {
|
} finally {
|
||||||
this.txnManager.endTransaction();
|
this.txnManager.endTransaction(Option.of(ownerInstant));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1451,13 +1452,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
}
|
}
|
||||||
|
|
||||||
HoodieTable table;
|
HoodieTable table;
|
||||||
|
Option<HoodieInstant> ownerInstant = Option.empty();
|
||||||
this.txnManager.beginTransaction();
|
if (instantTime.isPresent()) {
|
||||||
|
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
|
||||||
|
}
|
||||||
|
this.txnManager.beginTransaction(ownerInstant, Option.empty());
|
||||||
try {
|
try {
|
||||||
tryUpgrade(metaClient, instantTime);
|
tryUpgrade(metaClient, instantTime);
|
||||||
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
|
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
|
||||||
} finally {
|
} finally {
|
||||||
this.txnManager.endTransaction();
|
this.txnManager.endTransaction(ownerInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate table properties
|
// Validate table properties
|
||||||
|
|||||||
@@ -157,7 +157,8 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
|
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
|
||||||
try {
|
try {
|
||||||
if (acquireLock) {
|
if (acquireLock) {
|
||||||
txnManager.beginTransaction();
|
// there is no owner or instant time per se for archival.
|
||||||
|
txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||||
}
|
}
|
||||||
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
|
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
|
||||||
verifyLastMergeArchiveFilesIfNecessary(context);
|
verifyLastMergeArchiveFilesIfNecessary(context);
|
||||||
@@ -179,7 +180,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
} finally {
|
} finally {
|
||||||
close();
|
close();
|
||||||
if (acquireLock) {
|
if (acquireLock) {
|
||||||
txnManager.endTransaction();
|
txnManager.endTransaction(Option.empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,14 +45,6 @@ public class TransactionManager implements Serializable {
|
|||||||
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
|
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void beginTransaction() {
|
|
||||||
if (isOptimisticConcurrencyControlEnabled) {
|
|
||||||
LOG.info("Transaction starting without a transaction owner");
|
|
||||||
lockManager.lock();
|
|
||||||
LOG.info("Transaction started without a transaction owner");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
|
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
|
||||||
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
||||||
if (isOptimisticConcurrencyControlEnabled) {
|
if (isOptimisticConcurrencyControlEnabled) {
|
||||||
@@ -65,30 +57,25 @@ public class TransactionManager implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void endTransaction() {
|
|
||||||
if (isOptimisticConcurrencyControlEnabled) {
|
|
||||||
LOG.info("Transaction ending without a transaction owner");
|
|
||||||
lockManager.unlock();
|
|
||||||
LOG.info("Transaction ended without a transaction owner");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
|
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
|
||||||
if (isOptimisticConcurrencyControlEnabled) {
|
if (isOptimisticConcurrencyControlEnabled) {
|
||||||
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
|
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
|
||||||
reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
|
if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
|
||||||
lockManager.unlock();
|
lockManager.unlock();
|
||||||
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
|
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void reset(Option<HoodieInstant> callerInstant,
|
private synchronized boolean reset(Option<HoodieInstant> callerInstant,
|
||||||
Option<HoodieInstant> newTxnOwnerInstant,
|
Option<HoodieInstant> newTxnOwnerInstant,
|
||||||
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
||||||
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
|
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
|
||||||
this.currentTxnOwnerInstant = newTxnOwnerInstant;
|
this.currentTxnOwnerInstant = newTxnOwnerInstant;
|
||||||
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
|
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|||||||
@@ -232,14 +232,14 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
|
|||||||
HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
|
HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
|
||||||
try {
|
try {
|
||||||
// update the table config and timeline in a lock as there could be another indexer running
|
// update the table config and timeline in a lock as there could be another indexer running
|
||||||
txnManager.beginTransaction();
|
txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
|
||||||
updateMetadataPartitionsTableConfig(table.getMetaClient(),
|
updateMetadataPartitionsTableConfig(table.getMetaClient(),
|
||||||
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
|
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
|
||||||
table.getActiveTimeline().saveAsComplete(
|
table.getActiveTimeline().saveAsComplete(
|
||||||
new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
|
new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
|
||||||
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
|
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
|
||||||
} finally {
|
} finally {
|
||||||
txnManager.endTransaction();
|
txnManager.endTransaction(Option.of(indexInstant));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -69,20 +69,28 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleWriterTransaction() {
|
public void testSingleWriterTransaction() {
|
||||||
transactionManager.beginTransaction();
|
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
|
||||||
transactionManager.endTransaction();
|
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
|
||||||
|
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||||
|
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleWriterNestedTransaction() {
|
public void testSingleWriterNestedTransaction() {
|
||||||
transactionManager.beginTransaction();
|
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
|
||||||
|
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||||
|
|
||||||
|
Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");
|
||||||
|
|
||||||
assertThrows(HoodieLockException.class, () -> {
|
assertThrows(HoodieLockException.class, () -> {
|
||||||
transactionManager.beginTransaction();
|
transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
|
||||||
});
|
});
|
||||||
|
|
||||||
transactionManager.endTransaction();
|
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||||
assertDoesNotThrow(() -> {
|
assertDoesNotThrow(() -> {
|
||||||
transactionManager.endTransaction();
|
transactionManager.endTransaction(newTxnOwnerInstant1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,11 +102,16 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
|
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
|
||||||
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000001");
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000002");
|
||||||
|
Option<HoodieInstant> lastCompletedInstant2 = getInstant("0000003");
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant2 = getInstant("0000004");
|
||||||
|
|
||||||
// Let writer1 get the lock first, then wait for others
|
// Let writer1 get the lock first, then wait for others
|
||||||
// to join the sync up point.
|
// to join the sync up point.
|
||||||
Thread writer1 = new Thread(() -> {
|
Thread writer1 = new Thread(() -> {
|
||||||
assertDoesNotThrow(() -> {
|
assertDoesNotThrow(() -> {
|
||||||
transactionManager.beginTransaction();
|
transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
|
||||||
});
|
});
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
try {
|
try {
|
||||||
@@ -111,7 +124,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
//
|
//
|
||||||
}
|
}
|
||||||
assertDoesNotThrow(() -> {
|
assertDoesNotThrow(() -> {
|
||||||
transactionManager.endTransaction();
|
transactionManager.endTransaction(newTxnOwnerInstant1);
|
||||||
});
|
});
|
||||||
writer1Completed.set(true);
|
writer1Completed.set(true);
|
||||||
});
|
});
|
||||||
@@ -127,10 +140,10 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
//
|
//
|
||||||
}
|
}
|
||||||
assertDoesNotThrow(() -> {
|
assertDoesNotThrow(() -> {
|
||||||
transactionManager.beginTransaction();
|
transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2);
|
||||||
});
|
});
|
||||||
assertDoesNotThrow(() -> {
|
assertDoesNotThrow(() -> {
|
||||||
transactionManager.endTransaction();
|
transactionManager.endTransaction(newTxnOwnerInstant2);
|
||||||
});
|
});
|
||||||
writer2Completed.set(true);
|
writer2Completed.set(true);
|
||||||
});
|
});
|
||||||
@@ -152,6 +165,32 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
Assertions.assertTrue(writer2Completed.get());
|
Assertions.assertTrue(writer2Completed.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEndTransactionByDiffOwner() throws InterruptedException {
|
||||||
|
// 1. Begin and end by the same transaction owner
|
||||||
|
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
|
||||||
|
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||||
|
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||||
|
// Another writer thread
|
||||||
|
Thread writer2 = new Thread(() -> {
|
||||||
|
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
|
||||||
|
transactionManager.endTransaction(newTxnOwnerInstant1);
|
||||||
|
countDownLatch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
writer2.start();
|
||||||
|
countDownLatch.await(30, TimeUnit.SECONDS);
|
||||||
|
// should not have reset the state within transaction manager since the owner is different.
|
||||||
|
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||||
|
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||||
|
|
||||||
|
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||||
|
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||||
|
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTransactionsWithInstantTime() {
|
public void testTransactionsWithInstantTime() {
|
||||||
// 1. Begin and end by the same transaction owner
|
// 1. Begin and end by the same transaction owner
|
||||||
@@ -164,14 +203,15 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||||
|
|
||||||
// 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner
|
// 2. Begin transaction with a new txn owner, but end transaction with wrong owner
|
||||||
lastCompletedInstant = getInstant("0000002");
|
lastCompletedInstant = getInstant("0000002");
|
||||||
newTxnOwnerInstant = getInstant("0000003");
|
newTxnOwnerInstant = getInstant("0000003");
|
||||||
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||||
transactionManager.endTransaction();
|
transactionManager.endTransaction(getInstant("0000004"));
|
||||||
// Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
|
// Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
|
||||||
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
|
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
|
||||||
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
|
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
|
||||||
|
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||||
|
|
||||||
// 3. But, we should be able to begin a new transaction for a new owner
|
// 3. But, we should be able to begin a new transaction for a new owner
|
||||||
lastCompletedInstant = getInstant("0000003");
|
lastCompletedInstant = getInstant("0000003");
|
||||||
@@ -183,15 +223,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
|||||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||||
|
|
||||||
// 4. Transactions with no owners should also go through
|
// 4. Transactions with new instants but with same timestamps should properly reset owners
|
||||||
transactionManager.beginTransaction();
|
|
||||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
|
||||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
|
||||||
transactionManager.endTransaction();
|
|
||||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
|
||||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
|
||||||
|
|
||||||
// 5. Transactions with new instants but with same timestamps should properly reset owners
|
|
||||||
transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
|
transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
|
||||||
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
|
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||||
|
|||||||
Reference in New Issue
Block a user