[HUDI-3686] Fix inline and async table service check in HoodieWriteConfig (#5307)
This commit is contained in:
@@ -2012,7 +2012,9 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
* @return True if any table services are configured to run inline, false otherwise.
|
* @return True if any table services are configured to run inline, false otherwise.
|
||||||
*/
|
*/
|
||||||
public Boolean areAnyTableServicesExecutedInline() {
|
public Boolean areAnyTableServicesExecutedInline() {
|
||||||
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean() || isAutoArchive();
|
return areTableServicesEnabled()
|
||||||
|
&& (inlineClusteringEnabled() || inlineCompactionEnabled()
|
||||||
|
|| (isAutoClean() && !isAsyncClean()) || (isAutoArchive() && !isAsyncArchive()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -2021,9 +2023,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
* @return True if any table services are configured to run async, false otherwise.
|
* @return True if any table services are configured to run async, false otherwise.
|
||||||
*/
|
*/
|
||||||
public Boolean areAnyTableServicesAsync() {
|
public Boolean areAnyTableServicesAsync() {
|
||||||
return isAsyncClusteringEnabled()
|
return areTableServicesEnabled()
|
||||||
|
&& (isAsyncClusteringEnabled()
|
||||||
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|
|| (getTableType() == HoodieTableType.MERGE_ON_READ && !inlineCompactionEnabled())
|
||||||
|| isAsyncClean() || isAsyncArchive();
|
|| (isAutoClean() && isAsyncClean()) || (isAutoArchive() && isAsyncArchive()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean areAnyTableServicesScheduledInline() {
|
public Boolean areAnyTableServicesScheduledInline() {
|
||||||
|
|||||||
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieTableType;
|
|||||||
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.marker.MarkerType;
|
import org.apache.hudi.common.table.marker.MarkerType;
|
||||||
import org.apache.hudi.common.util.Option;
|
|
||||||
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
import org.apache.hudi.config.HoodieWriteConfig.Builder;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
|
||||||
@@ -48,6 +47,7 @@ import java.util.Properties;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
||||||
|
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_ARCHIVE;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
|
import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
|
import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
|
import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
|
||||||
@@ -138,7 +138,7 @@ public class TestHoodieWriteConfig {
|
|||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
}), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||||
|
|
||||||
// 2. Async clean
|
// 2. Async clean
|
||||||
@@ -151,7 +151,7 @@ public class TestHoodieWriteConfig {
|
|||||||
put(ASYNC_CLEAN.key(), "true");
|
put(ASYNC_CLEAN.key(), "true");
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
}), true, true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||||
|
|
||||||
// 3. Async compaction configured
|
// 3. Async compaction configured
|
||||||
@@ -165,7 +165,7 @@ public class TestHoodieWriteConfig {
|
|||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true,
|
}), true,
|
||||||
tableType == HoodieTableType.MERGE_ON_READ,
|
tableType == HoodieTableType.MERGE_ON_READ, true,
|
||||||
tableType == HoodieTableType.MERGE_ON_READ
|
tableType == HoodieTableType.MERGE_ON_READ
|
||||||
? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
|
? WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL
|
||||||
: WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
: WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
@@ -186,10 +186,25 @@ public class TestHoodieWriteConfig {
|
|||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), Option.of(true), Option.of(false), Option.of(true),
|
}), true, false, true,
|
||||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
|
|
||||||
|
// 5. All async services
|
||||||
|
verifyConcurrencyControlRelatedConfigs(createWriteConfig(new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||||
|
put(ASYNC_CLUSTERING_ENABLE.key(), "true");
|
||||||
|
put(INLINE_COMPACT.key(), "false");
|
||||||
|
put(AUTO_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_CLEAN.key(), "true");
|
||||||
|
put(ASYNC_ARCHIVE.key(), "true");
|
||||||
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
|
}
|
||||||
|
}), true, true, false,
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
|
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@@ -205,7 +220,7 @@ public class TestHoodieWriteConfig {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
true, true,
|
true, true, true,
|
||||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
@@ -219,7 +234,7 @@ public class TestHoodieWriteConfig {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
true, true,
|
true, true, true,
|
||||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY,
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY,
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
}
|
}
|
||||||
@@ -240,7 +255,7 @@ public class TestHoodieWriteConfig {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
true, tableType == HoodieTableType.MERGE_ON_READ,
|
true, tableType == HoodieTableType.MERGE_ON_READ, true,
|
||||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
FileSystemBasedLockProviderTestClass.class.getName());
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
@@ -257,7 +272,7 @@ public class TestHoodieWriteConfig {
|
|||||||
ZookeeperBasedLockProvider.class.getName());
|
ZookeeperBasedLockProvider.class.getName());
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true, true,
|
}), true, true, true,
|
||||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
ZookeeperBasedLockProvider.class.getName());
|
ZookeeperBasedLockProvider.class.getName());
|
||||||
@@ -271,13 +286,13 @@ public class TestHoodieWriteConfig {
|
|||||||
});
|
});
|
||||||
if (writeConfig.areAnyTableServicesAsync()) {
|
if (writeConfig.areAnyTableServicesAsync()) {
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
true, true,
|
true, true, true,
|
||||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
HoodieFailedWritesCleaningPolicy.LAZY,
|
HoodieFailedWritesCleaningPolicy.LAZY,
|
||||||
InProcessLockProvider.class.getName());
|
InProcessLockProvider.class.getName());
|
||||||
} else {
|
} else {
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||||
true, false,
|
true, false, true,
|
||||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
@@ -294,7 +309,7 @@ public class TestHoodieWriteConfig {
|
|||||||
put(TABLE_SERVICES_ENABLED.key(), "false");
|
put(TABLE_SERVICES_ENABLED.key(), "false");
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
}), false, false, false,
|
||||||
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
@@ -311,7 +326,7 @@ public class TestHoodieWriteConfig {
|
|||||||
FileSystemBasedLockProviderTestClass.class.getName());
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
}), false, false, false,
|
||||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
HoodieFailedWritesCleaningPolicy.LAZY,
|
HoodieFailedWritesCleaningPolicy.LAZY,
|
||||||
FileSystemBasedLockProviderTestClass.class.getName());
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
@@ -332,7 +347,7 @@ public class TestHoodieWriteConfig {
|
|||||||
put(ASYNC_CLEAN.key(), "false");
|
put(ASYNC_CLEAN.key(), "false");
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true, true,
|
}), true, true, true,
|
||||||
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||||
@@ -351,7 +366,8 @@ public class TestHoodieWriteConfig {
|
|||||||
FileSystemBasedLockProviderTestClass.class.getName());
|
FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||||
}
|
}
|
||||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
}), true, true, true,
|
||||||
|
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||||
HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
|
HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -415,32 +431,14 @@ public class TestHoodieWriteConfig {
|
|||||||
private void verifyConcurrencyControlRelatedConfigs(
|
private void verifyConcurrencyControlRelatedConfigs(
|
||||||
HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled,
|
HoodieWriteConfig writeConfig, boolean expectedTableServicesEnabled,
|
||||||
boolean expectedAnyTableServicesAsync,
|
boolean expectedAnyTableServicesAsync,
|
||||||
|
boolean expectedAnyTableServicesExecutedInline,
|
||||||
WriteConcurrencyMode expectedConcurrencyMode,
|
WriteConcurrencyMode expectedConcurrencyMode,
|
||||||
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
|
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
|
||||||
String expectedLockProviderName) {
|
String expectedLockProviderName) {
|
||||||
verifyConcurrencyControlRelatedConfigs(writeConfig, Option.of(expectedTableServicesEnabled),
|
assertEquals(expectedTableServicesEnabled, writeConfig.areTableServicesEnabled());
|
||||||
Option.of(expectedAnyTableServicesAsync), Option.empty(), expectedConcurrencyMode,
|
assertEquals(expectedAnyTableServicesAsync, writeConfig.areAnyTableServicesAsync());
|
||||||
expectedCleanPolicy, expectedLockProviderName);
|
assertEquals(
|
||||||
}
|
expectedAnyTableServicesExecutedInline, writeConfig.areAnyTableServicesExecutedInline());
|
||||||
|
|
||||||
private void verifyConcurrencyControlRelatedConfigs(
|
|
||||||
HoodieWriteConfig writeConfig, Option<Boolean> expectedTableServicesEnabled,
|
|
||||||
Option<Boolean> expectedAnyTableServicesAsync,
|
|
||||||
Option<Boolean> expectedAnyTableServicesExecutedInline,
|
|
||||||
WriteConcurrencyMode expectedConcurrencyMode,
|
|
||||||
HoodieFailedWritesCleaningPolicy expectedCleanPolicy,
|
|
||||||
String expectedLockProviderName) {
|
|
||||||
if (expectedTableServicesEnabled.isPresent()) {
|
|
||||||
assertEquals(expectedTableServicesEnabled.get(), writeConfig.areTableServicesEnabled());
|
|
||||||
}
|
|
||||||
if (expectedAnyTableServicesAsync.isPresent()) {
|
|
||||||
assertEquals(expectedAnyTableServicesAsync.get(), writeConfig.areAnyTableServicesAsync());
|
|
||||||
}
|
|
||||||
if (expectedAnyTableServicesExecutedInline.isPresent()) {
|
|
||||||
assertEquals(expectedAnyTableServicesExecutedInline.get(),
|
|
||||||
writeConfig.areAnyTableServicesExecutedInline());
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode());
|
assertEquals(expectedConcurrencyMode, writeConfig.getWriteConcurrencyMode());
|
||||||
assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy());
|
assertEquals(expectedCleanPolicy, writeConfig.getFailedWritesCleanPolicy());
|
||||||
assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());
|
assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());
|
||||||
|
|||||||
Reference in New Issue
Block a user