[HUDI-3404] Automatically adjust write configs based on metadata table and write concurrency mode (#4975)
This commit is contained in:
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FileSystemRetryConfig;
|
|||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
@@ -66,6 +67,8 @@ import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
|
|||||||
import org.apache.hudi.table.storage.HoodieStorageLayout;
|
import org.apache.hudi.table.storage.HoodieStorageLayout;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.orc.CompressionKind;
|
import org.apache.orc.CompressionKind;
|
||||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||||
|
|
||||||
@@ -93,6 +96,7 @@ import java.util.stream.Collectors;
|
|||||||
+ "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).")
|
+ "higher level frameworks (e.g Spark datasources, Flink sink) and utilities (e.g DeltaStreamer).")
|
||||||
public class HoodieWriteConfig extends HoodieConfig {
|
public class HoodieWriteConfig extends HoodieConfig {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(HoodieWriteConfig.class);
|
||||||
private static final long serialVersionUID = 0L;
|
private static final long serialVersionUID = 0L;
|
||||||
|
|
||||||
// This is a constant as is should never be changed via config (will invalidate previous commits)
|
// This is a constant as is should never be changed via config (will invalidate previous commits)
|
||||||
@@ -903,6 +907,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getString(TBL_NAME);
|
return getString(TBL_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HoodieTableType getTableType() {
|
||||||
|
return HoodieTableType.valueOf(getStringOrDefault(
|
||||||
|
HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
|
||||||
|
}
|
||||||
|
|
||||||
public String getPreCombineField() {
|
public String getPreCombineField() {
|
||||||
return getString(PRECOMBINE_FIELD_NAME);
|
return getString(PRECOMBINE_FIELD_NAME);
|
||||||
}
|
}
|
||||||
@@ -1930,7 +1939,9 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
* @return True if any table services are configured to run async, false otherwise.
|
* @return True if any table services are configured to run async, false otherwise.
|
||||||
*/
|
*/
|
||||||
public Boolean areAnyTableServicesAsync() {
|
public Boolean areAnyTableServicesAsync() {
|
||||||
return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean() || isAsyncArchive();
|
return isAsyncClusteringEnabled()
|
||||||
|
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|
||||||
|
|| isAsyncClean() || isAsyncArchive();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean areAnyTableServicesScheduledInline() {
|
public Boolean areAnyTableServicesScheduledInline() {
|
||||||
@@ -2390,19 +2401,56 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||||
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
||||||
|
|
||||||
// Async table services can update the metadata table and a lock provider is
|
autoAdjustConfigsForConcurrencyMode();
|
||||||
// needed to guard against any concurrent table write operations. If user has
|
}
|
||||||
// not configured any lock provider, let's use the InProcess lock provider.
|
|
||||||
|
private void autoAdjustConfigsForConcurrencyMode() {
|
||||||
|
boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
|
||||||
final TypedProperties writeConfigProperties = writeConfig.getProps();
|
final TypedProperties writeConfigProperties = writeConfig.getProps();
|
||||||
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|
||||||
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
|
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
|
||||||
|
|
||||||
if (!isLockConfigSet) {
|
if (!isLockConfigSet) {
|
||||||
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
|
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
|
||||||
if (!isLockProviderPropertySet && writeConfig.areAnyTableServicesAsync()) {
|
|
||||||
lockConfigBuilder.withLockProvider(InProcessLockProvider.class);
|
|
||||||
}
|
|
||||||
writeConfig.setDefault(lockConfigBuilder.build());
|
writeConfig.setDefault(lockConfigBuilder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isMetadataTableEnabled) {
|
||||||
|
// When metadata table is enabled, optimistic concurrency control must be used for
|
||||||
|
// single writer with async table services.
|
||||||
|
// Async table services can update the metadata table and a lock provider is
|
||||||
|
// needed to guard against any concurrent table write operations. If user has
|
||||||
|
// not configured any lock provider, let's use the InProcess lock provider.
|
||||||
|
boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
|
||||||
|
boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();
|
||||||
|
|
||||||
|
if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
|
||||||
|
// This is targeted at Single writer with async table services
|
||||||
|
// If user does not set the lock provider, likely that the concurrency mode is not set either
|
||||||
|
// Override the configs for metadata table
|
||||||
|
writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||||
|
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||||
|
InProcessLockProvider.class.getName());
|
||||||
|
LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
|
||||||
|
+ "lock provider for single writer with async table services",
|
||||||
|
WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
|
||||||
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We check if "hoodie.cleaner.policy.failed.writes"
|
||||||
|
// is properly set to LAZY for optimistic concurrency control
|
||||||
|
String writeConcurrencyMode = writeConfig.getString(WRITE_CONCURRENCY_MODE);
|
||||||
|
if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()
|
||||||
|
.equalsIgnoreCase(writeConcurrencyMode)) {
|
||||||
|
// In this case, we assume that the user takes care of setting the lock provider used
|
||||||
|
writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY.name());
|
||||||
|
LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used",
|
||||||
|
HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY.name()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validate() {
|
private void validate() {
|
||||||
@@ -2411,9 +2459,9 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
|
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
|
||||||
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
|
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
|
||||||
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
|
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
|
||||||
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
|
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
|
||||||
ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
|
ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
|
||||||
!= HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
|
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,20 +21,26 @@ package org.apache.hudi.config;
|
|||||||
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
|
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
|
||||||
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||||
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
|
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
|
||||||
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
|
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.marker.MarkerType;
|
import org.apache.hudi.common.table.marker.MarkerType;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -44,9 +50,11 @@ import java.util.function.Function;
|
|||||||
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
|
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
|
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
|
||||||
|
import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
|
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
|
||||||
|
import static org.apache.hudi.config.HoodieWriteConfig.TABLE_SERVICES_ENABLED;
|
||||||
|
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestHoodieWriteConfig {
|
public class TestHoodieWriteConfig {
|
||||||
@@ -114,87 +122,193 @@ public class TestHoodieWriteConfig {
|
|||||||
EngineType.JAVA, MarkerType.DIRECT));
|
EngineType.JAVA, MarkerType.DIRECT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testDefaultLockProviderWhenAsyncServicesEnabled() {
|
@EnumSource(HoodieTableType.class)
|
||||||
|
public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType tableType) {
|
||||||
final String inProcessLockProviderClassName = InProcessLockProvider.class.getCanonicalName();
|
final String inProcessLockProviderClassName = InProcessLockProvider.class.getCanonicalName();
|
||||||
|
// With metadata table enabled by default, any async table service enabled should
|
||||||
// Any async clustering enabled should use InProcess lock provider
|
// use InProcess lock provider as default when no other lock provider is set.
|
||||||
// as default when no other lock provider is set.
|
|
||||||
|
|
||||||
// 1. Async clustering
|
// 1. Async clustering
|
||||||
HoodieWriteConfig writeConfig = createWriteConfig(new HashMap<String, String>() {
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
{
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
||||||
put(INLINE_COMPACT.key(), "true");
|
put(INLINE_COMPACT.key(), "true");
|
||||||
put(AUTO_CLEAN.key(), "true");
|
put(AUTO_CLEAN.key(), "true");
|
||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
}
|
}
|
||||||
});
|
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
assertTrue(writeConfig.areAnyTableServicesAsync());
|
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||||
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
|
||||||
|
|
||||||
// 2. Async clean
|
// 2. Async clean
|
||||||
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
{
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
put(INLINE_COMPACT.key(), "true");
|
put(INLINE_COMPACT.key(), "true");
|
||||||
put(AUTO_CLEAN.key(), "true");
|
put(AUTO_CLEAN.key(), "true");
|
||||||
put(ASYNC_CLEAN.key(), "true");
|
put(ASYNC_CLEAN.key(), "true");
|
||||||
}
|
}
|
||||||
});
|
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
assertTrue(writeConfig.areAnyTableServicesAsync());
|
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||||
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
|
||||||
|
|
||||||
// 3. Async compaction
|
// 3. Async compaction configured
|
||||||
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
{
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
put(INLINE_COMPACT.key(), "false");
|
put(INLINE_COMPACT.key(), "false");
|
||||||
put(AUTO_CLEAN.key(), "true");
|
put(AUTO_CLEAN.key(), "true");
|
||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
}
|
}
|
||||||
});
|
}), true,
|
||||||
assertTrue(writeConfig.areAnyTableServicesAsync());
|
tableType == HoodieTableType.MERGE_ON_READ,
|
||||||
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
tableType == HoodieTableType.MERGE_ON_READ
|
||||||
|
? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
|
||||||
|
: WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
tableType == HoodieTableType.MERGE_ON_READ
|
||||||
|
? HoodieFailedWritesCleaningPolicy.LAZY
|
||||||
|
: HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
tableType == HoodieTableType.MERGE_ON_READ
|
||||||
|
? inProcessLockProviderClassName
|
||||||
|
: HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
|
||||||
// 4. All inline services
|
// 4. All inline services
|
||||||
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
{
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
put(INLINE_COMPACT.key(), "true");
|
put(INLINE_COMPACT.key(), "true");
|
||||||
put(AUTO_CLEAN.key(), "true");
|
put(AUTO_CLEAN.key(), "true");
|
||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
}
|
}
|
||||||
});
|
}), Option.of(true), Option.of(false), Option.of(true),
|
||||||
assertFalse(writeConfig.areAnyTableServicesAsync());
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
assertTrue(writeConfig.areAnyTableServicesExecutedInline());
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
}
|
||||||
|
|
||||||
// 5. User override for the lock provider should always take the precedence
|
@ParameterizedTest
|
||||||
writeConfig = HoodieWriteConfig.newBuilder()
|
@EnumSource(HoodieTableType.class)
|
||||||
|
public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) {
|
||||||
|
// 1. User override for the lock provider should always take the precedence
|
||||||
|
TypedProperties properties = new TypedProperties();
|
||||||
|
properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
|
||||||
.withPath("/tmp")
|
.withPath("/tmp")
|
||||||
.withLockConfig(HoodieLockConfig.newBuilder()
|
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||||
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||||
.build())
|
.build())
|
||||||
.build();
|
|
||||||
assertEquals(FileSystemBasedLockProviderTestClass.class.getName(), writeConfig.getLockProviderClass());
|
|
||||||
|
|
||||||
// 6. User can set the lock provider via properties
|
|
||||||
TypedProperties properties = new TypedProperties();
|
|
||||||
properties.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), ZookeeperBasedLockProvider.class.getName());
|
|
||||||
writeConfig = HoodieWriteConfig.newBuilder()
|
|
||||||
.withPath("/tmp")
|
|
||||||
.withProperties(properties)
|
.withProperties(properties)
|
||||||
.build();
|
.build();
|
||||||
assertEquals(ZookeeperBasedLockProvider.class.getName(), writeConfig.getLockProviderClass());
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
|
true, tableType == HoodieTableType.MERGE_ON_READ,
|
||||||
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
|
|
||||||
// Default config should have default lock provider
|
// 2. User can set the lock provider via properties
|
||||||
writeConfig = createWriteConfig(Collections.emptyMap());
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
if (!writeConfig.areAnyTableServicesAsync()) {
|
{
|
||||||
assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass());
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
} else {
|
put(ASYNC_CLUSTERING_ENABLE.key(), "false");
|
||||||
assertEquals(inProcessLockProviderClassName, writeConfig.getLockProviderClass());
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "true");
|
||||||
|
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||||
|
ZookeeperBasedLockProvider.class.getName());
|
||||||
}
|
}
|
||||||
|
}), true, true,
|
||||||
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
ZookeeperBasedLockProvider.class.getName());
|
||||||
|
|
||||||
|
// 3. Default config should have default lock provider
|
||||||
|
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (writeConfig.areAnyTableServicesAsync()) {
|
||||||
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
|
true, true,
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY,
|
||||||
|
InProcessLockProvider.class.getName());
|
||||||
|
} else {
|
||||||
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
|
true, false,
|
||||||
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(HoodieTableType.class)
|
||||||
|
public void testAutoConcurrencyConfigAdjustmentWithNoTableService(HoodieTableType tableType) {
|
||||||
|
// 1. No table service, concurrency control configs should not be overwritten
|
||||||
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
put(TABLE_SERVICES_ENABLED.key(), "false");
|
||||||
|
}
|
||||||
|
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
||||||
|
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
|
||||||
|
// 2. No table service, with optimistic concurrency control,
|
||||||
|
// failed write clean policy should be updated accordingly
|
||||||
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
put(TABLE_SERVICES_ENABLED.key(), "false");
|
||||||
|
put(WRITE_CONCURRENCY_MODE.key(),
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||||
|
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||||
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
|
}
|
||||||
|
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY,
|
||||||
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(HoodieTableType.class)
|
||||||
|
public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieTableType tableType) {
|
||||||
|
// 1. Metadata table disabled, with async table services, concurrency control configs
|
||||||
|
// should not be changed
|
||||||
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
put(HoodieMetadataConfig.ENABLE.key(), "false");
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
||||||
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
|
}
|
||||||
|
}), true, true,
|
||||||
|
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
|
||||||
|
// 2. Metadata table disabled, with optimistic concurrency control,
|
||||||
|
// failed write clean policy should be updated accordingly
|
||||||
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
||||||
|
put(INLINE_COMPACT.key(), "true");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
|
put(WRITE_CONCURRENCY_MODE.key(),
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||||
|
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||||
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
|
}
|
||||||
|
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
|
private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
|
||||||
@@ -253,4 +367,38 @@ public class TestHoodieWriteConfig {
|
|||||||
mapping.put(k3, v3);
|
mapping.put(k3, v3);
|
||||||
return mapping;
|
return mapping;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyConcurrencyControlRelatedConfigs(
|
||||||
|
HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled,
|
||||||
|
boolean expectedAnyTableServicesAsync,
|
||||||
|
WriteConcurrencyMode expectedConcurrencyMode,
|
||||||
|
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
|
||||||
|
String expectedLockProviderName) {
|
||||||
|
verifyConcurrencyControlRelatedConfigs(writeConfig, Option.of(expectedTableServicesEnabled),
|
||||||
|
Option.of(expectedAnyTableServicesAsync), Option.empty(), expectedConcurrencyMode,
|
||||||
|
expectedCleanPolicy, expectedLockProviderName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyConcurrencyControlRelatedConfigs(
|
||||||
|
HoodieWriteConfig writeConfig, Option<Boolean> expectedTableServicesEnabled,
|
||||||
|
Option<Boolean> expectedAnyTableServicesAsync,
|
||||||
|
Option<Boolean> expectedAnyTableServicesExecutedInline,
|
||||||
|
WriteConcurrencyMode expectedConcurrencyMode,
|
||||||
|
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
|
||||||
|
String expectedLockProviderName) {
|
||||||
|
if (expectedTableServicesEnabled.isPresent()) {
|
||||||
|
assertEquals(expectedTableServicesEnabled.get(), writeConfig.areTableServicesEnabled());
|
||||||
|
}
|
||||||
|
if (expectedAnyTableServicesAsync.isPresent()) {
|
||||||
|
assertEquals(expectedAnyTableServicesAsync.get(), writeConfig.areAnyTableServicesAsync());
|
||||||
|
}
|
||||||
|
if (expectedAnyTableServicesExecutedInline.isPresent()) {
|
||||||
|
assertEquals(expectedAnyTableServicesExecutedInline.get(),
|
||||||
|
writeConfig.areAnyTableServicesExecutedInline());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode());
|
||||||
|
assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy());
|
||||||
|
assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user