[HUDI-4065] Add FileBasedLockProvider (#6071)
This commit is contained in:
@@ -0,0 +1,152 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.client.transaction.lock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.lock.LockProvider;
|
||||||
|
import org.apache.hudi.common.lock.LockState;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations
|
||||||
|
* using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again.
|
||||||
|
* NOTE: This only works for DFS with atomic create/delete operation
|
||||||
|
*/
|
||||||
|
public class FileSystemBasedLockProvider implements LockProvider<String>, Serializable {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class);
|
||||||
|
|
||||||
|
private static final String LOCK_FILE_NAME = "lock";
|
||||||
|
|
||||||
|
private final int lockTimeoutMinutes;
|
||||||
|
private transient FileSystem fs;
|
||||||
|
private transient Path lockFile;
|
||||||
|
protected LockConfiguration lockConfiguration;
|
||||||
|
|
||||||
|
public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) {
|
||||||
|
checkRequiredProps(lockConfiguration);
|
||||||
|
this.lockConfiguration = lockConfiguration;
|
||||||
|
String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
|
||||||
|
if (StringUtils.isNullOrEmpty(lockDirectory)) {
|
||||||
|
lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
|
||||||
|
+ Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
|
||||||
|
}
|
||||||
|
this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
|
||||||
|
this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
|
||||||
|
this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
synchronized (LOCK_FILE_NAME) {
|
||||||
|
try {
|
||||||
|
fs.delete(this.lockFile, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryLock(long time, TimeUnit unit) {
|
||||||
|
try {
|
||||||
|
synchronized (LOCK_FILE_NAME) {
|
||||||
|
// Check whether lock is already expired, if so try to delete lock file
|
||||||
|
if (fs.exists(this.lockFile) && checkIfExpired()) {
|
||||||
|
fs.delete(this.lockFile, true);
|
||||||
|
}
|
||||||
|
acquireLock();
|
||||||
|
return fs.exists(this.lockFile);
|
||||||
|
}
|
||||||
|
} catch (IOException | HoodieIOException e) {
|
||||||
|
LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unlock() {
|
||||||
|
synchronized (LOCK_FILE_NAME) {
|
||||||
|
try {
|
||||||
|
if (fs.exists(this.lockFile)) {
|
||||||
|
fs.delete(this.lockFile, true);
|
||||||
|
}
|
||||||
|
} catch (IOException io) {
|
||||||
|
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getLock() {
|
||||||
|
return this.lockFile.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkIfExpired() {
|
||||||
|
if (lockTimeoutMinutes == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
|
||||||
|
if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (IOException | HoodieIOException e) {
|
||||||
|
LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void acquireLock() {
|
||||||
|
try {
|
||||||
|
fs.create(this.lockFile, false).close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String generateLogStatement(LockState state) {
|
||||||
|
return StringUtils.join(state.name(), " lock at: ", getLock());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkRequiredProps(final LockConfiguration config) {
|
||||||
|
ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null
|
||||||
|
|| config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null);
|
||||||
|
ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -36,6 +36,7 @@ import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUI
|
|||||||
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
|
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
|
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
|
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
|
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
|
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
|
||||||
@@ -106,7 +107,13 @@ public class HoodieLockConfig extends HoodieConfig {
|
|||||||
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
|
.key(FILESYSTEM_LOCK_PATH_PROP_KEY)
|
||||||
.noDefaultValue()
|
.noDefaultValue()
|
||||||
.sinceVersion("0.8.0")
|
.sinceVersion("0.8.0")
|
||||||
.withDocumentation("For DFS based lock providers, path to store the locks under.");
|
.withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Integer> FILESYSTEM_LOCK_EXPIRE = ConfigProperty
|
||||||
|
.key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)
|
||||||
|
.defaultValue(0)
|
||||||
|
.sinceVersion("0.12.0")
|
||||||
|
.withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire");
|
||||||
|
|
||||||
public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
|
public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
|
||||||
.key(HIVE_DATABASE_NAME_PROP_KEY)
|
.key(HIVE_DATABASE_NAME_PROP_KEY)
|
||||||
|
|||||||
@@ -0,0 +1,135 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
|
||||||
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
|
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
|
||||||
|
|
||||||
|
public class TestFileBasedLockProvider {
|
||||||
|
private static HdfsTestService hdfsTestService;
|
||||||
|
private static MiniDFSCluster dfsCluster;
|
||||||
|
private static LockConfiguration lockConfiguration;
|
||||||
|
private static Configuration hadoopConf;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void setup() throws IOException {
|
||||||
|
hdfsTestService = new HdfsTestService();
|
||||||
|
dfsCluster = hdfsTestService.start(true);
|
||||||
|
hadoopConf = dfsCluster.getFileSystem().getConf();
|
||||||
|
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
|
||||||
|
properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
|
||||||
|
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
|
||||||
|
properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
|
||||||
|
properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
|
||||||
|
lockConfiguration = new LockConfiguration(properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void cleanUpAfterAll() throws IOException {
|
||||||
|
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
|
||||||
|
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
|
||||||
|
fs.delete(new Path("/tmp"), true);
|
||||||
|
if (hdfsTestService != null) {
|
||||||
|
hdfsTestService.stop();
|
||||||
|
hdfsTestService = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void cleanUpAfterEach() throws IOException {
|
||||||
|
Path workDir = dfsCluster.getFileSystem().getWorkingDirectory();
|
||||||
|
FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf());
|
||||||
|
fs.delete(new Path("/tmp/lock"), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireLock() {
|
||||||
|
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
|
||||||
|
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
fileBasedLockProvider.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireLockWithDefaultPath() {
|
||||||
|
lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY);
|
||||||
|
lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/");
|
||||||
|
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
|
||||||
|
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
fileBasedLockProvider.unlock();
|
||||||
|
lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnLock() {
|
||||||
|
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
|
||||||
|
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
fileBasedLockProvider.unlock();
|
||||||
|
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReentrantLock() {
|
||||||
|
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
|
||||||
|
Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
|
fileBasedLockProvider.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnlockWithoutLock() {
|
||||||
|
try {
|
||||||
|
FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf);
|
||||||
|
fileBasedLockProvider.unlock();
|
||||||
|
} catch (HoodieLockException e) {
|
||||||
|
Assertions.fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
|
||||||
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||||
import org.apache.hudi.common.config.LockConfiguration;
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||||
@@ -45,9 +46,11 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.EnumSource;
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -66,7 +69,11 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
|
||||||
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
|
||||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
@@ -76,6 +83,21 @@ import static org.junit.jupiter.api.Assertions.fail;
|
|||||||
|
|
||||||
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||||
|
|
||||||
|
private Properties lockProperties = null;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setup() throws IOException {
|
||||||
|
if (lockProperties == null) {
|
||||||
|
lockProperties = new Properties();
|
||||||
|
lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
||||||
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
||||||
|
lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1");
|
||||||
|
lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
|
||||||
|
lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000");
|
||||||
|
lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void setUpMORTestTable() throws IOException {
|
public void setUpMORTestTable() throws IOException {
|
||||||
cleanupResources();
|
cleanupResources();
|
||||||
initPath();
|
initPath();
|
||||||
@@ -92,15 +114,27 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
cleanupResources();
|
cleanupResources();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final List<Class> LOCK_PROVIDER_CLASSES = Arrays.asList(
|
||||||
|
InProcessLockProvider.class,
|
||||||
|
FileSystemBasedLockProvider.class);
|
||||||
|
|
||||||
|
private static Iterable<Object[]> providerClassAndTableType() {
|
||||||
|
List<Object[]> opts = new ArrayList<>();
|
||||||
|
for (Object providerClass : LOCK_PROVIDER_CLASSES) {
|
||||||
|
opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass});
|
||||||
|
opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass});
|
||||||
|
}
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
|
@MethodSource("providerClassAndTableType")
|
||||||
public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception {
|
public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception {
|
||||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||||
setUpMORTestTable();
|
setUpMORTestTable();
|
||||||
}
|
}
|
||||||
Properties properties = new Properties();
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
||||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
|
||||||
HoodieWriteConfig writeConfig = getConfigBuilder()
|
HoodieWriteConfig writeConfig = getConfigBuilder()
|
||||||
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
||||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||||
@@ -110,8 +144,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||||
// Timeline-server-based markers are not used for multi-writer tests
|
// Timeline-server-based markers are not used for multi-writer tests
|
||||||
.withMarkersType(MarkerType.DIRECT.name())
|
.withMarkersType(MarkerType.DIRECT.name())
|
||||||
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
|
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
|
||||||
.build()).withAutoCommit(false).withProperties(properties).build();
|
.build()).withAutoCommit(false).withProperties(lockProperties).build();
|
||||||
|
|
||||||
// Create the first commit
|
// Create the first commit
|
||||||
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
|
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
|
||||||
@@ -172,16 +206,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
assertTrue(writer1Completed.get() && writer2Completed.get());
|
assertTrue(writer1Completed.get() && writer2Completed.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
|
|
||||||
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception {
|
|
||||||
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
|
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
|
||||||
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
|
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
|
||||||
@@ -189,11 +213,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
setUpMORTestTable();
|
setUpMORTestTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
Properties properties = new Properties();
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
||||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
|
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
|
|
||||||
|
|
||||||
HoodieWriteConfig cfg = getConfigBuilder()
|
HoodieWriteConfig cfg = getConfigBuilder()
|
||||||
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
||||||
@@ -210,7 +232,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
// Timeline-server-based markers are not used for multi-writer tests
|
// Timeline-server-based markers are not used for multi-writer tests
|
||||||
.withMarkersType(MarkerType.DIRECT.name())
|
.withMarkersType(MarkerType.DIRECT.name())
|
||||||
.withProperties(properties)
|
.withProperties(lockProperties)
|
||||||
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
|
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
|
||||||
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
|
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
|
||||||
.build();
|
.build();
|
||||||
@@ -260,15 +282,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
|
@ParameterizedTest
|
||||||
|
@MethodSource("providerClassAndTableType")
|
||||||
|
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception {
|
||||||
// create inserts X 1
|
// create inserts X 1
|
||||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||||
setUpMORTestTable();
|
setUpMORTestTable();
|
||||||
}
|
}
|
||||||
Properties properties = new Properties();
|
|
||||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
|
||||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
|
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
|
||||||
// Disabling embedded timeline server, it doesn't work with multiwriter
|
// Disabling embedded timeline server, it doesn't work with multiwriter
|
||||||
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
||||||
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
||||||
@@ -284,8 +304,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
|
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
|
||||||
FileSystemViewStorageType.MEMORY).build())
|
FileSystemViewStorageType.MEMORY).build())
|
||||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||||
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
|
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
|
||||||
.build()).withAutoCommit(false).withProperties(properties);
|
.build()).withAutoCommit(false).withProperties(lockProperties);
|
||||||
Set<String> validInstants = new HashSet<>();
|
Set<String> validInstants = new HashSet<>();
|
||||||
// Create the first commit with inserts
|
// Create the first commit with inserts
|
||||||
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
||||||
@@ -458,10 +478,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
|
public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
|
||||||
Properties properties = new Properties();
|
lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
|
||||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
|
|
||||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
|
|
||||||
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
||||||
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
.withCleanConfig(HoodieCleanConfig.newBuilder()
|
||||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||||
@@ -470,7 +487,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
|||||||
// Timeline-server-based markers are not used for multi-writer tests
|
// Timeline-server-based markers are not used for multi-writer tests
|
||||||
.withMarkersType(MarkerType.DIRECT.name())
|
.withMarkersType(MarkerType.DIRECT.name())
|
||||||
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
|
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
|
||||||
.build()).withAutoCommit(true).withProperties(properties);
|
.build()).withAutoCommit(true).withProperties(lockProperties);
|
||||||
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
||||||
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
|
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
|
||||||
|
|
||||||
|
|||||||
@@ -48,6 +48,8 @@ public class LockConfiguration implements Serializable {
|
|||||||
|
|
||||||
public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
|
public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path";
|
||||||
|
|
||||||
|
public static final String FILESYSTEM_LOCK_EXPIRE_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "expire";
|
||||||
|
|
||||||
// configs for metastore based locks
|
// configs for metastore based locks
|
||||||
public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";
|
public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user