[HUDI-3394] Check isWriteLockedByCurrentThread before unlock for InProcessLockProvider (#4819)
Co-authored-by: yuezhang <yuezhang@freewheel.tv> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
@@ -19,13 +19,14 @@
|
||||
|
||||
package org.apache.hudi.client.transaction.lock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.common.config.LockConfiguration;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.lock.LockProvider;
|
||||
import org.apache.hudi.common.lock.LockState;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieLockException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@@ -92,7 +93,11 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
|
||||
public void unlock() {
|
||||
LOG.info(getLogMessage(LockState.RELEASING));
|
||||
try {
|
||||
LOCK.writeLock().unlock();
|
||||
if (LOCK.isWriteLockedByCurrentThread()) {
|
||||
LOCK.writeLock().unlock();
|
||||
} else {
|
||||
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
|
||||
}
|
||||
|
||||
@@ -83,6 +83,10 @@ public class LockManager implements Serializable, AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to take care of the scenarios that current thread may not be the holder of this lock
|
||||
* and tries to call unlock()
|
||||
*/
|
||||
public void unlock() {
|
||||
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
|
||||
getLockProvider().unlock();
|
||||
|
||||
@@ -238,7 +238,7 @@ public class TestInProcessLockProvider {
|
||||
assertDoesNotThrow(() -> {
|
||||
inProcessLockProvider.unlock();
|
||||
});
|
||||
assertThrows(HoodieLockException.class, () -> {
|
||||
assertDoesNotThrow(() -> {
|
||||
inProcessLockProvider.unlock();
|
||||
});
|
||||
}
|
||||
@@ -246,7 +246,7 @@ public class TestInProcessLockProvider {
|
||||
@Test
|
||||
public void testUnlockWithoutLock() {
|
||||
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||
assertThrows(HoodieLockException.class, () -> {
|
||||
assertDoesNotThrow(() -> {
|
||||
inProcessLockProvider.unlock();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
|
||||
});
|
||||
|
||||
transactionManager.endTransaction();
|
||||
assertThrows(HoodieLockException.class, () -> {
|
||||
assertDoesNotThrow(() -> {
|
||||
transactionManager.endTransaction();
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user