[HUDI-2962] InProcess lock provider to guard single writer process with async table operations (#4259)
- Adding Local JVM process based lock provider implementation - This local lock provider can be used by a single writer process with async table operations to guard the metadata tabl against concurrent updates.
This commit is contained in:
committed by
GitHub
parent
6eba8345cb
commit
7784249e55
@@ -0,0 +1,117 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.client.transaction.lock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.lock.LockProvider;
|
||||||
|
import org.apache.hudi.common.lock.LockState;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* InProcess level lock. This {@link LockProvider} implementation is to
|
||||||
|
* guard table from concurrent operations happening in the local JVM process.
|
||||||
|
* <p>
|
||||||
|
* Note: This Lock provider implementation doesn't allow lock reentrancy.
|
||||||
|
* Attempting to reacquire the lock from the same thread will throw
|
||||||
|
* HoodieLockException. Threads other than the current lock owner, will
|
||||||
|
* block on lock() and return false on tryLock().
|
||||||
|
*/
|
||||||
|
public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(ZookeeperBasedLockProvider.class);
|
||||||
|
private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
|
||||||
|
private final long maxWaitTimeMillis;
|
||||||
|
|
||||||
|
public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
|
||||||
|
TypedProperties typedProperties = lockConfiguration.getConfig();
|
||||||
|
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
|
||||||
|
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void lock() {
|
||||||
|
LOG.info(getLogMessage(LockState.ACQUIRING));
|
||||||
|
if (LOCK.isWriteLockedByCurrentThread()) {
|
||||||
|
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
|
||||||
|
}
|
||||||
|
LOCK.writeLock().lock();
|
||||||
|
LOG.info(getLogMessage(LockState.ACQUIRED));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryLock() {
|
||||||
|
return tryLock(maxWaitTimeMillis, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryLock(long time, @NotNull TimeUnit unit) {
|
||||||
|
LOG.info(getLogMessage(LockState.ACQUIRING));
|
||||||
|
if (LOCK.isWriteLockedByCurrentThread()) {
|
||||||
|
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isLockAcquired;
|
||||||
|
try {
|
||||||
|
isLockAcquired = LOCK.writeLock().tryLock(time, unit);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(getLogMessage(isLockAcquired ? LockState.ACQUIRED : LockState.FAILED_TO_ACQUIRE));
|
||||||
|
return isLockAcquired;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unlock() {
|
||||||
|
LOG.info(getLogMessage(LockState.RELEASING));
|
||||||
|
try {
|
||||||
|
LOCK.writeLock().unlock();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
|
||||||
|
}
|
||||||
|
LOG.info(getLogMessage(LockState.RELEASED));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReentrantReadWriteLock getLock() {
|
||||||
|
return LOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (LOCK.isWriteLockedByCurrentThread()) {
|
||||||
|
LOCK.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getLogMessage(LockState state) {
|
||||||
|
return StringUtils.join(String.valueOf(Thread.currentThread().getId()),
|
||||||
|
state.name(), " local process lock.");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,253 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.client.transaction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||||
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
|
public class TestInProcessLockProvider {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
|
||||||
|
private final Configuration hadoopConfiguration = new Configuration();
|
||||||
|
private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockAcquisition() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockReAcquisitionBySameThread() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertThrows(HoodieLockException.class, () -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockReAcquisitionByDifferentThread() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// Main test thread
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Another writer thread in parallel, should block
|
||||||
|
// and later acquire the lock once it is released
|
||||||
|
Thread writer2 = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
writer2Completed.set(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
writer2.start();
|
||||||
|
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
writer2.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
Assertions.assertTrue(writer2Completed.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLockAcquisition() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock());
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLockAcquisitionWithTimeout() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLockReAcquisitionBySameThread() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock());
|
||||||
|
assertThrows(HoodieLockException.class, () -> {
|
||||||
|
inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLockReAcquisitionByDifferentThread() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// Main test thread
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock());
|
||||||
|
|
||||||
|
// Another writer thread
|
||||||
|
Thread writer2 = new Thread(() -> {
|
||||||
|
Assertions.assertFalse(inProcessLockProvider.tryLock(100L, TimeUnit.MILLISECONDS));
|
||||||
|
writer2Completed.set(true);
|
||||||
|
});
|
||||||
|
writer2.start();
|
||||||
|
try {
|
||||||
|
writer2.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertTrue(writer2Completed.get());
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
|
||||||
|
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
final int threadCount = 3;
|
||||||
|
final long awaitMaxTimeoutMs = 2000L;
|
||||||
|
final CountDownLatch latch = new CountDownLatch(threadCount);
|
||||||
|
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
// Let writer1 get the lock first, then wait for others
|
||||||
|
// to join the sync up point.
|
||||||
|
Thread writer1 = new Thread(() -> {
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock());
|
||||||
|
latch.countDown();
|
||||||
|
try {
|
||||||
|
latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS);
|
||||||
|
// Following sleep is to make sure writer2 attempts
|
||||||
|
// to try lock and to get bocked on the lock which
|
||||||
|
// this thread is currently holding.
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
writer1Completed.set(true);
|
||||||
|
});
|
||||||
|
writer1.start();
|
||||||
|
|
||||||
|
// Writer2 will block on trying to acquire the lock
|
||||||
|
// and will eventually get the lock before the timeout.
|
||||||
|
Thread writer2 = new Thread(() -> {
|
||||||
|
latch.countDown();
|
||||||
|
Assertions.assertTrue(inProcessLockProvider.tryLock(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS));
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
writer2Completed.set(true);
|
||||||
|
});
|
||||||
|
writer2.start();
|
||||||
|
|
||||||
|
// Let writer1 and writer2 wait at the sync up
|
||||||
|
// point to make sure they run in parallel and
|
||||||
|
// one get blocked by the other.
|
||||||
|
latch.countDown();
|
||||||
|
try {
|
||||||
|
writer1.join();
|
||||||
|
writer2.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure both writers actually completed good
|
||||||
|
Assertions.assertTrue(writer1Completed.get());
|
||||||
|
Assertions.assertTrue(writer2Completed.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLockReleaseByClose() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.close();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRedundantUnlock() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.lock();
|
||||||
|
});
|
||||||
|
assertDoesNotThrow(() -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
assertThrows(HoodieLockException.class, () -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnlockWithoutLock() {
|
||||||
|
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
|
||||||
|
assertThrows(HoodieLockException.class, () -> {
|
||||||
|
inProcessLockProvider.unlock();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user