diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
new file mode 100644
index 000000000..b151879d6
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * InProcess level lock. This {@link LockProvider} implementation is to
+ * guard table from concurrent operations happening in the local JVM process.
+ *
+ * Note: This Lock provider implementation doesn't allow lock reentrancy.
+ * Attempting to reacquire the lock from the same thread will throw
+ * HoodieLockException. Threads other than the current lock owner, will
+ * block on lock() and return false on tryLock().
+ */
+public class InProcessLockProvider implements LockProvider {
+
+ private static final Logger LOG = LogManager.getLogger(ZookeeperBasedLockProvider.class);
+ private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
+ private final long maxWaitTimeMillis;
+
+ public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
+ TypedProperties typedProperties = lockConfiguration.getConfig();
+ maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
+ LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
+ }
+
+ @Override
+ public void lock() {
+ LOG.info(getLogMessage(LockState.ACQUIRING));
+ if (LOCK.isWriteLockedByCurrentThread()) {
+ throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
+ }
+ LOCK.writeLock().lock();
+ LOG.info(getLogMessage(LockState.ACQUIRED));
+ }
+
+ @Override
+ public boolean tryLock() {
+ return tryLock(maxWaitTimeMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean tryLock(long time, @NotNull TimeUnit unit) {
+ LOG.info(getLogMessage(LockState.ACQUIRING));
+ if (LOCK.isWriteLockedByCurrentThread()) {
+ throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
+ }
+
+ boolean isLockAcquired;
+ try {
+ isLockAcquired = LOCK.writeLock().tryLock(time, unit);
+ } catch (InterruptedException e) {
+ throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
+ }
+
+ LOG.info(getLogMessage(isLockAcquired ? LockState.ACQUIRED : LockState.FAILED_TO_ACQUIRE));
+ return isLockAcquired;
+ }
+
+ @Override
+ public void unlock() {
+ LOG.info(getLogMessage(LockState.RELEASING));
+ try {
+ LOCK.writeLock().unlock();
+ } catch (Exception e) {
+ throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
+ }
+ LOG.info(getLogMessage(LockState.RELEASED));
+ }
+
+ @Override
+ public ReentrantReadWriteLock getLock() {
+ return LOCK;
+ }
+
+ @Override
+ public void close() {
+ if (LOCK.isWriteLockedByCurrentThread()) {
+ LOCK.writeLock().unlock();
+ }
+ }
+
+ private String getLogMessage(LockState state) {
+ return StringUtils.join(String.valueOf(Thread.currentThread().getId()),
+ state.name(), " local process lock.");
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
new file mode 100644
index 000000000..9e7472c13
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestInProcessLockProvider {
+
+ private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
+ private final Configuration hadoopConfiguration = new Configuration();
+ private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
+
+ @Test
+ public void testLockAcquisition() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testLockReAcquisitionBySameThread() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testLockReAcquisitionByDifferentThread() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
+ // Main test thread
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+
+ // Another writer thread in parallel, should block
+ // and later acquire the lock once it is released
+ Thread writer2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ writer2Completed.set(true);
+ }
+ });
+ writer2.start();
+
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+
+ try {
+ writer2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+ Assertions.assertTrue(writer2Completed.get());
+ }
+
+ @Test
+ public void testTryLockAcquisition() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ Assertions.assertTrue(inProcessLockProvider.tryLock());
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testTryLockAcquisitionWithTimeout() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testTryLockReAcquisitionBySameThread() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ Assertions.assertTrue(inProcessLockProvider.tryLock());
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testTryLockReAcquisitionByDifferentThread() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
+ // Main test thread
+ Assertions.assertTrue(inProcessLockProvider.tryLock());
+
+ // Another writer thread
+ Thread writer2 = new Thread(() -> {
+ Assertions.assertFalse(inProcessLockProvider.tryLock(100L, TimeUnit.MILLISECONDS));
+ writer2Completed.set(true);
+ });
+ writer2.start();
+ try {
+ writer2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+
+ Assertions.assertTrue(writer2Completed.get());
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
+ final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ final int threadCount = 3;
+ final long awaitMaxTimeoutMs = 2000L;
+ final CountDownLatch latch = new CountDownLatch(threadCount);
+ final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
+ // Let writer1 get the lock first, then wait for others
+ // to join the sync up point.
+ Thread writer1 = new Thread(() -> {
+ Assertions.assertTrue(inProcessLockProvider.tryLock());
+ latch.countDown();
+ try {
+ latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS);
+ // Following sleep is to make sure writer2 attempts
+ // to try lock and to get bocked on the lock which
+ // this thread is currently holding.
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ //
+ }
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ writer1Completed.set(true);
+ });
+ writer1.start();
+
+ // Writer2 will block on trying to acquire the lock
+ // and will eventually get the lock before the timeout.
+ Thread writer2 = new Thread(() -> {
+ latch.countDown();
+ Assertions.assertTrue(inProcessLockProvider.tryLock(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS));
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ writer2Completed.set(true);
+ });
+ writer2.start();
+
+ // Let writer1 and writer2 wait at the sync up
+ // point to make sure they run in parallel and
+ // one get blocked by the other.
+ latch.countDown();
+ try {
+ writer1.join();
+ writer2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+
+ // Make sure both writers actually completed good
+ Assertions.assertTrue(writer1Completed.get());
+ Assertions.assertTrue(writer2Completed.get());
+ }
+
+ @Test
+ public void testLockReleaseByClose() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.close();
+ });
+ }
+
+ @Test
+ public void testRedundantUnlock() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider.unlock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testUnlockWithoutLock() {
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider.unlock();
+ });
+ }
+}