From 251d4eb3b64704b9dd51bf6f6ecb5bf47089b745 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Sun, 9 Jan 2022 19:10:24 -0800 Subject: [PATCH] [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override (#4406) * [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Making InProcessLockProvider as the default lock provider when any async services are enabled and when no lock provider is explicitly set. - This is the workaround for metadata table updates racing with async table serice operations * [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Renaming isAnyTableServicesInline/Async() to areAnyTableServicesInline/Async() * [HUDI-3030] InProcessLockPovider as default when any async servcies enabled with no lock provider override - Additionally checking for write config properties when verifying the lock provider override. Updated the unit test for this case. --- .../client/AbstractHoodieWriteClient.java | 2 +- .../client/transaction/lock/LockManager.java | 15 ++- .../apache/hudi/config/HoodieWriteConfig.java | 34 +++++- .../hudi/config/TestHoodieWriteConfig.java | 102 ++++++++++++++++++ .../hudi/common/config/HoodieConfig.java | 6 +- 5 files changed, 149 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 76b10fddd..66c76ffbc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -464,7 +464,7 @@ public abstract class AbstractHoodieWriteClient table, HoodieCommitMetadata metadata, Option> extraMetadata) { - if (config.inlineTableServices()) { + if (config.areAnyTableServicesInline()) { if (config.isMetadataTableEnabled()) { table.getHoodieView().sync(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index 976205f35..773685980 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; import org.apache.log4j.LogManager; @@ -42,12 +43,18 @@ public class LockManager implements Serializable, AutoCloseable { private final HoodieWriteConfig writeConfig; private final LockConfiguration lockConfiguration; private final SerializableConfiguration hadoopConf; + private final int maxRetries; + private final long maxWaitTimeInMs; private volatile LockProvider lockProvider; public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { this.writeConfig = writeConfig; this.hadoopConf = new SerializableConfiguration(fs.getConf()); this.lockConfiguration = new LockConfiguration(writeConfig.getProps()); + maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, + Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); + maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, + Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); } public void lock() { @@ -55,19 +62,17 @@ public class LockManager implements Serializable, AutoCloseable { LockProvider lockProvider = getLockProvider(); int retryCount = 0; boolean acquired = false; - int retries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY); - long waitTimeInMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); - while (retryCount <= retries) { + while (retryCount <= maxRetries) { try { acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS); if (acquired) { break; } LOG.info("Retrying to acquire lock..."); - Thread.sleep(waitTimeInMs); + Thread.sleep(maxWaitTimeInMs); retryCount++; } catch (HoodieLockException | InterruptedException e) { - if (retryCount >= retries) { + if (retryCount >= maxRetries) { throw new HoodieLockException("Unable to acquire lock, lock object ", e); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b04e3801e..7b33d338f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -21,12 +21,14 @@ package org.apache.hudi.config; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.transaction.ConflictResolutionStrategy; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -1834,10 +1836,24 @@ public class HoodieWriteConfig extends HoodieConfig { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } - public Boolean inlineTableServices() { + /** + * Are any table services configured to run inline? + * + * @return True if any table services are configured to run inline, false otherwise. + */ + public Boolean areAnyTableServicesInline() { return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); } + /** + * Are any table services configured to run async? + * + * @return True if any table services are configured to run async, false otherwise. + */ + public Boolean areAnyTableServicesAsync() { + return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean(); + } + public String getPreCommitValidators() { return getString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES); } @@ -2267,13 +2283,25 @@ public class HoodieWriteConfig extends HoodieConfig { HoodiePayloadConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isMetadataConfigSet, HoodieMetadataConfig.newBuilder().withEngineType(engineType).fromProperties(writeConfig.getProps()).build()); - writeConfig.setDefaultOnCondition(!isLockConfigSet, - HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet, HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isLayoutConfigSet, HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); + + // Async table services can update the metadata table and a lock provider is + // needed to guard against any concurrent table write operations. If user has + // not configured any lock provider, let's use the InProcess lock provider. + final TypedProperties writeConfigProperties = writeConfig.getProps(); + final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME) + || writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP); + if (!isLockConfigSet) { + HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()); + if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) { + lockConfigBuilder.withLockProvider(InProcessLockProvider.class); + } + writeConfig.setDefault(lockConfigBuilder.build()); + } } private void validate() { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 59b7a9274..c86a34a60 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -18,6 +18,10 @@ package org.apache.hudi.config; +import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.config.HoodieWriteConfig.Builder; @@ -30,13 +34,19 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.function.Function; +import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; +import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN; +import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN; +import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieWriteConfig { @@ -104,6 +114,98 @@ public class TestHoodieWriteConfig { EngineType.JAVA, MarkerType.DIRECT)); } + @Test + public void testDefaultLockProviderWhenAsyncServicesEnabled() { + final String inProcessLockProviderClassName = InProcessLockProvider.class.getCanonicalName(); + + // Any async clustering enabled should use InProcess lock provider + // as default when no other lock provider is set. + + // 1. Async clustering + HoodieWriteConfig writeConfig = createWriteConfig(new HashMap() { + { + put(ASYNC_CLUSTERING_ENABLE.key(), "true"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }); + assertTrue(writeConfig.areAnyTableServicesAsync()); + assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + + // 2. Async clean + writeConfig = createWriteConfig(new HashMap() { + { + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "true"); + } + }); + assertTrue(writeConfig.areAnyTableServicesAsync()); + assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + + // 3. Async compaction + writeConfig = createWriteConfig(new HashMap() { + { + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "false"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }); + assertTrue(writeConfig.areAnyTableServicesAsync()); + assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + + // 4. All inline services + writeConfig = createWriteConfig(new HashMap() { + { + put(ASYNC_CLUSTERING_ENABLE.key(), "false"); + put(INLINE_COMPACT.key(), "true"); + put(AUTO_CLEAN.key(), "true"); + put(ASYNC_CLEAN.key(), "false"); + } + }); + assertFalse(writeConfig.areAnyTableServicesAsync()); + assertTrue(writeConfig.areAnyTableServicesInline()); + assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass()); + + // 5. User override for the lock provider should always take the precedence + writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .build()) + .build(); + assertEquals(FileSystemBasedLockProviderTestClass.class.getName(), writeConfig.getLockProviderClass()); + + // 6. User can set the lock provider via properties + TypedProperties properties = new TypedProperties(); + properties.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), ZookeeperBasedLockProvider.class.getName()); + writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withProperties(properties) + .build(); + assertEquals(ZookeeperBasedLockProvider.class.getName(), writeConfig.getLockProviderClass()); + + // Default config should have default lock provider + writeConfig = createWriteConfig(Collections.emptyMap()); + if (!writeConfig.areAnyTableServicesAsync()) { + assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass()); + } else { + assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass()); + } + } + + private HoodieWriteConfig createWriteConfig(Map configs) { + final Properties properties = new Properties(); + configs.forEach(properties::setProperty); + return HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withProperties(properties) + .build(); + } + private ByteArrayOutputStream saveParamsIntoOutputStream(Map params) throws IOException { Properties properties = new Properties(); properties.putAll(params); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index c05116532..c4308f79d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -199,10 +199,14 @@ public class HoodieConfig implements Serializable { public void setDefaultOnCondition(boolean condition, HoodieConfig config) { if (condition) { - props.putAll(config.getProps()); + setDefault(config); } } + public void setDefault(HoodieConfig config) { + props.putAll(config.getProps()); + } + public String getStringOrThrow(ConfigProperty configProperty, String errorMessage) throws HoodieException { Option rawValue = getRawValue(configProperty); if (rawValue.isPresent()) {