[HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override (#4406)
* [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Making InProcessLockProvider as the default lock provider when any async services are enabled and when no lock provider is explicitly set. - This is the workaround for metadata table updates racing with async table serice operations * [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Renaming isAnyTableServicesInline/Async() to areAnyTableServicesInline/Async() * [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Additionally checking for write config properties when verifying the lock provider override. Updated the unit test for this case.
This commit is contained in:
committed by
GitHub
parent
56f93f4ebd
commit
251d4eb3b6
@@ -464,7 +464,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
|
protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
|
||||||
if (config.inlineTableServices()) {
|
if (config.areAnyTableServicesInline()) {
|
||||||
if (config.isMetadataTableEnabled()) {
|
if (config.isMetadataTableEnabled()) {
|
||||||
table.getHoodieView().sync();
|
table.getHoodieView().sync();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.LockConfiguration;
|
|||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.lock.LockProvider;
|
import org.apache.hudi.common.lock.LockProvider;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
|
import org.apache.hudi.config.HoodieLockConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieLockException;
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -42,12 +43,18 @@ public class LockManager implements Serializable, AutoCloseable {
|
|||||||
private final HoodieWriteConfig writeConfig;
|
private final HoodieWriteConfig writeConfig;
|
||||||
private final LockConfiguration lockConfiguration;
|
private final LockConfiguration lockConfiguration;
|
||||||
private final SerializableConfiguration hadoopConf;
|
private final SerializableConfiguration hadoopConf;
|
||||||
|
private final int maxRetries;
|
||||||
|
private final long maxWaitTimeInMs;
|
||||||
private volatile LockProvider lockProvider;
|
private volatile LockProvider lockProvider;
|
||||||
|
|
||||||
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
|
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
|
||||||
this.writeConfig = writeConfig;
|
this.writeConfig = writeConfig;
|
||||||
this.hadoopConf = new SerializableConfiguration(fs.getConf());
|
this.hadoopConf = new SerializableConfiguration(fs.getConf());
|
||||||
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
|
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
|
||||||
|
maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
|
||||||
|
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
|
||||||
|
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
|
||||||
|
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void lock() {
|
public void lock() {
|
||||||
@@ -55,19 +62,17 @@ public class LockManager implements Serializable, AutoCloseable {
|
|||||||
LockProvider lockProvider = getLockProvider();
|
LockProvider lockProvider = getLockProvider();
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
boolean acquired = false;
|
boolean acquired = false;
|
||||||
int retries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY);
|
while (retryCount <= maxRetries) {
|
||||||
long waitTimeInMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
|
|
||||||
while (retryCount <= retries) {
|
|
||||||
try {
|
try {
|
||||||
acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
|
acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
|
||||||
if (acquired) {
|
if (acquired) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
LOG.info("Retrying to acquire lock...");
|
LOG.info("Retrying to acquire lock...");
|
||||||
Thread.sleep(waitTimeInMs);
|
Thread.sleep(maxWaitTimeInMs);
|
||||||
retryCount++;
|
retryCount++;
|
||||||
} catch (HoodieLockException | InterruptedException e) {
|
} catch (HoodieLockException | InterruptedException e) {
|
||||||
if (retryCount >= retries) {
|
if (retryCount >= maxRetries) {
|
||||||
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
|
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,12 +21,14 @@ package org.apache.hudi.config;
|
|||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.bootstrap.BootstrapMode;
|
import org.apache.hudi.client.bootstrap.BootstrapMode;
|
||||||
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
|
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
|
||||||
|
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||||
import org.apache.hudi.common.config.ConfigClassProperty;
|
import org.apache.hudi.common.config.ConfigClassProperty;
|
||||||
import org.apache.hudi.common.config.ConfigGroups;
|
import org.apache.hudi.common.config.ConfigGroups;
|
||||||
import org.apache.hudi.common.config.ConfigProperty;
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||||
import org.apache.hudi.common.config.HoodieConfig;
|
import org.apache.hudi.common.config.HoodieConfig;
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
@@ -1834,10 +1836,24 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
|
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean inlineTableServices() {
|
/**
|
||||||
|
* Are any table services configured to run inline?
|
||||||
|
*
|
||||||
|
* @return True if any table services are configured to run inline, false otherwise.
|
||||||
|
*/
|
||||||
|
public Boolean areAnyTableServicesInline() {
|
||||||
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean();
|
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are any table services configured to run async?
|
||||||
|
*
|
||||||
|
* @return True if any table services are configured to run async, false otherwise.
|
||||||
|
*/
|
||||||
|
public Boolean areAnyTableServicesAsync() {
|
||||||
|
return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean();
|
||||||
|
}
|
||||||
|
|
||||||
public String getPreCommitValidators() {
|
public String getPreCommitValidators() {
|
||||||
return getString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES);
|
return getString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES);
|
||||||
}
|
}
|
||||||
@@ -2267,13 +2283,25 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
HoodiePayloadConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
HoodiePayloadConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||||
writeConfig.setDefaultOnCondition(!isMetadataConfigSet,
|
writeConfig.setDefaultOnCondition(!isMetadataConfigSet,
|
||||||
HoodieMetadataConfig.newBuilder().withEngineType(engineType).fromProperties(writeConfig.getProps()).build());
|
HoodieMetadataConfig.newBuilder().withEngineType(engineType).fromProperties(writeConfig.getProps()).build());
|
||||||
writeConfig.setDefaultOnCondition(!isLockConfigSet,
|
|
||||||
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
|
||||||
writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet,
|
writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet,
|
||||||
HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||||
writeConfig.setDefaultOnCondition(!isLayoutConfigSet,
|
writeConfig.setDefaultOnCondition(!isLayoutConfigSet,
|
||||||
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||||
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
||||||
|
|
||||||
|
// Async table services can update the metadata table and a lock provider is
|
||||||
|
// needed to guard against any concurrent table write operations. If user has
|
||||||
|
// not configured any lock provider, let's use the InProcess lock provider.
|
||||||
|
final TypedProperties writeConfigProperties = writeConfig.getProps();
|
||||||
|
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|
||||||
|
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
|
||||||
|
if (!isLockConfigSet) {
|
||||||
|
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
|
||||||
|
if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) {
|
||||||
|
lockConfigBuilder.withLockProvider(InProcessLockProvider.class);
|
||||||
|
}
|
||||||
|
writeConfig.setDefault(lockConfigBuilder.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validate() {
|
private void validate() {
|
||||||
|
|||||||
@@ -18,6 +18,10 @@
|
|||||||
|
|
||||||
package org.apache.hudi.config;
|
package org.apache.hudi.config;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
|
||||||
|
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||||
|
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.table.marker.MarkerType;
|
import org.apache.hudi.common.table.marker.MarkerType;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
||||||
@@ -30,13 +34,19 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
||||||
|
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
|
||||||
|
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
|
||||||
|
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestHoodieWriteConfig {
|
public class TestHoodieWriteConfig {
|
||||||
@@ -104,6 +114,98 @@ public class TestHoodieWriteConfig {
|
|||||||
EngineType.JAVA, MarkerType.DIRECT));
|
EngineType.JAVA, MarkerType.DIRECT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultLockProviderWhenAsyncServicesEnabled() {
|
||||||
|
final String inProcessLockProviderClassName = InProcessLockProvider.class.getCanonicalName();
|
||||||
|
|
||||||
|
// Any async clustering enabled should use InProcess lock provider
|
||||||
|
// as default when no other lock provider is set.
|
||||||
|
|
||||||
|
// 1. Async clustering
|
||||||
|
HoodieWriteConfig writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
||||||
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(writeConfig.areAnyTableServicesAsync());
|
||||||
|
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// 2. Async clean
|
||||||
|
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "true");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(writeConfig.areAnyTableServicesAsync());
|
||||||
|
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// 3. Async compaction
|
||||||
|
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
|
put(INLINE_COMPACT.key(), "false");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(writeConfig.areAnyTableServicesAsync());
|
||||||
|
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// 4. All inline services
|
||||||
|
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertFalse(writeConfig.areAnyTableServicesAsync());
|
||||||
|
assertTrue(writeConfig.areAnyTableServicesInline());
|
||||||
|
assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// 5. User override for the lock provider should always take the precedence
|
||||||
|
writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
|
.withPath("/tmp")
|
||||||
|
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||||
|
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
assertEquals(FileSystemBasedLockProviderTestClass.class.getName(), writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// 6. User can set the lock provider via properties
|
||||||
|
TypedProperties properties = new TypedProperties();
|
||||||
|
properties.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), ZookeeperBasedLockProvider.class.getName());
|
||||||
|
writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
|
.withPath("/tmp")
|
||||||
|
.withProperties(properties)
|
||||||
|
.build();
|
||||||
|
assertEquals(ZookeeperBasedLockProvider.class.getName(), writeConfig.getLockProviderClass());
|
||||||
|
|
||||||
|
// Default config should have default lock provider
|
||||||
|
writeConfig = createWriteConfig(Collections.emptyMap());
|
||||||
|
if (!writeConfig.areAnyTableServicesAsync()) {
|
||||||
|
assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass());
|
||||||
|
} else {
|
||||||
|
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
configs.forEach(properties::setProperty);
|
||||||
|
return HoodieWriteConfig.newBuilder()
|
||||||
|
.withPath("/tmp")
|
||||||
|
.withProperties(properties)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
|
private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.putAll(params);
|
properties.putAll(params);
|
||||||
|
|||||||
@@ -199,10 +199,14 @@ public class HoodieConfig implements Serializable {
|
|||||||
|
|
||||||
public void setDefaultOnCondition(boolean condition, HoodieConfig config) {
|
public void setDefaultOnCondition(boolean condition, HoodieConfig config) {
|
||||||
if (condition) {
|
if (condition) {
|
||||||
props.putAll(config.getProps());
|
setDefault(config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDefault(HoodieConfig config) {
|
||||||
|
props.putAll(config.getProps());
|
||||||
|
}
|
||||||
|
|
||||||
public <T> String getStringOrThrow(ConfigProperty<T> configProperty, String errorMessage) throws HoodieException {
|
public <T> String getStringOrThrow(ConfigProperty<T> configProperty, String errorMessage) throws HoodieException {
|
||||||
Option<Object> rawValue = getRawValue(configProperty);
|
Option<Object> rawValue = getRawValue(configProperty);
|
||||||
if (rawValue.isPresent()) {
|
if (rawValue.isPresent()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user