From 7784249e55c8dc6c1e5fa1142f7a93d78ca66471 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Fri, 17 Dec 2021 17:18:46 -0800 Subject: [PATCH] [HUDI-2962] InProcess lock provider to guard single writer process with async table operations (#4259) - Adding Local JVM process based lock provider implementation - This local lock provider can be used by a single writer process with async table operations to guard the metadata tabl against concurrent updates. --- .../lock/InProcessLockProvider.java | 117 ++++++++ .../TestInProcessLockProvider.java | 253 ++++++++++++++++++ 2 files changed, 370 insertions(+) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java new file mode 100644 index 000000000..b151879d6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * InProcess level lock. This {@link LockProvider} implementation is to + * guard table from concurrent operations happening in the local JVM process. + *

+ * Note: This Lock provider implementation doesn't allow lock reentrancy. + * Attempting to reacquire the lock from the same thread will throw + * HoodieLockException. Threads other than the current lock owner, will + * block on lock() and return false on tryLock(). + */ +public class InProcessLockProvider implements LockProvider { + + private static final Logger LOG = LogManager.getLogger(ZookeeperBasedLockProvider.class); + private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); + private final long maxWaitTimeMillis; + + public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + TypedProperties typedProperties = lockConfiguration.getConfig(); + maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, + LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS); + } + + @Override + public void lock() { + LOG.info(getLogMessage(LockState.ACQUIRING)); + if (LOCK.isWriteLockedByCurrentThread()) { + throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED)); + } + LOCK.writeLock().lock(); + LOG.info(getLogMessage(LockState.ACQUIRED)); + } + + @Override + public boolean tryLock() { + return tryLock(maxWaitTimeMillis, TimeUnit.MILLISECONDS); + } + + @Override + public boolean tryLock(long time, @NotNull TimeUnit unit) { + LOG.info(getLogMessage(LockState.ACQUIRING)); + if (LOCK.isWriteLockedByCurrentThread()) { + throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED)); + } + + boolean isLockAcquired; + try { + isLockAcquired = LOCK.writeLock().tryLock(time, unit); + } catch (InterruptedException e) { + throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE)); + } + + LOG.info(getLogMessage(isLockAcquired ? LockState.ACQUIRED : LockState.FAILED_TO_ACQUIRE)); + return isLockAcquired; + } + + @Override + public void unlock() { + LOG.info(getLogMessage(LockState.RELEASING)); + try { + LOCK.writeLock().unlock(); + } catch (Exception e) { + throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e); + } + LOG.info(getLogMessage(LockState.RELEASED)); + } + + @Override + public ReentrantReadWriteLock getLock() { + return LOCK; + } + + @Override + public void close() { + if (LOCK.isWriteLockedByCurrentThread()) { + LOCK.writeLock().unlock(); + } + } + + private String getLogMessage(LockState state) { + return StringUtils.join(String.valueOf(Thread.currentThread().getId()), + state.name(), " local process lock."); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java new file mode 100644 index 000000000..9e7472c13 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestInProcessLockProvider { + + private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class); + private final Configuration hadoopConfiguration = new Configuration(); + private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties()); + + @Test + public void testLockAcquisition() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testLockReAcquisitionBySameThread() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + assertThrows(HoodieLockException.class, () -> { + inProcessLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testLockReAcquisitionByDifferentThread() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Main test thread + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + + // Another writer thread in parallel, should block + // and later acquire the lock once it is released + Thread writer2 = new Thread(new Runnable() { + @Override + public void run() { + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + writer2Completed.set(true); + } + }); + writer2.start(); + + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + + try { + writer2.join(); + } catch (InterruptedException e) { + // + } + Assertions.assertTrue(writer2Completed.get()); + } + + @Test + public void testTryLockAcquisition() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + Assertions.assertTrue(inProcessLockProvider.tryLock()); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testTryLockAcquisitionWithTimeout() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS)); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testTryLockReAcquisitionBySameThread() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + Assertions.assertTrue(inProcessLockProvider.tryLock()); + assertThrows(HoodieLockException.class, () -> { + inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testTryLockReAcquisitionByDifferentThread() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Main test thread + Assertions.assertTrue(inProcessLockProvider.tryLock()); + + // Another writer thread + Thread writer2 = new Thread(() -> { + Assertions.assertFalse(inProcessLockProvider.tryLock(100L, TimeUnit.MILLISECONDS)); + writer2Completed.set(true); + }); + writer2.start(); + try { + writer2.join(); + } catch (InterruptedException e) { + // + } + + Assertions.assertTrue(writer2Completed.get()); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() { + final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + final int threadCount = 3; + final long awaitMaxTimeoutMs = 2000L; + final CountDownLatch latch = new CountDownLatch(threadCount); + final AtomicBoolean writer1Completed = new AtomicBoolean(false); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Let writer1 get the lock first, then wait for others + // to join the sync up point. + Thread writer1 = new Thread(() -> { + Assertions.assertTrue(inProcessLockProvider.tryLock()); + latch.countDown(); + try { + latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS); + // Following sleep is to make sure writer2 attempts + // to try lock and to get bocked on the lock which + // this thread is currently holding. + Thread.sleep(50); + } catch (InterruptedException e) { + // + } + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + writer1Completed.set(true); + }); + writer1.start(); + + // Writer2 will block on trying to acquire the lock + // and will eventually get the lock before the timeout. + Thread writer2 = new Thread(() -> { + latch.countDown(); + Assertions.assertTrue(inProcessLockProvider.tryLock(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS)); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + writer2Completed.set(true); + }); + writer2.start(); + + // Let writer1 and writer2 wait at the sync up + // point to make sure they run in parallel and + // one get blocked by the other. + latch.countDown(); + try { + writer1.join(); + writer2.join(); + } catch (InterruptedException e) { + // + } + + // Make sure both writers actually completed good + Assertions.assertTrue(writer1Completed.get()); + Assertions.assertTrue(writer2Completed.get()); + } + + @Test + public void testLockReleaseByClose() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.close(); + }); + } + + @Test + public void testRedundantUnlock() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + assertDoesNotThrow(() -> { + inProcessLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + inProcessLockProvider.unlock(); + }); + assertThrows(HoodieLockException.class, () -> { + inProcessLockProvider.unlock(); + }); + } + + @Test + public void testUnlockWithoutLock() { + InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration); + assertThrows(HoodieLockException.class, () -> { + inProcessLockProvider.unlock(); + }); + } +}