[HUDI-3772] Fixing auto adjustment of lock configs for deltastreamer (#5207)
This commit is contained in:
committed by
GitHub
parent
cc3737be50
commit
84064a9b08
@@ -480,6 +480,12 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
.sinceVersion("0.11.0")
|
||||
.withDocumentation("Control to enable release all persist rdds when the spark job finish.");
|
||||
|
||||
public static final ConfigProperty<Boolean> AUTO_ADJUST_LOCK_CONFIGS = ConfigProperty
|
||||
.key("hoodie.auto.adjust.lock.configs")
|
||||
.defaultValue(false)
|
||||
.sinceVersion("0.11.0")
|
||||
.withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services.");
|
||||
|
||||
private ConsistencyGuardConfig consistencyGuardConfig;
|
||||
private FileSystemRetryConfig fileSystemRetryConfig;
|
||||
|
||||
@@ -1968,6 +1974,9 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
* Hoodie Client Lock Configs.
|
||||
* @return
|
||||
*/
|
||||
public boolean isAutoAdjustLockConfigs() {
|
||||
return getBooleanOrDefault(AUTO_ADJUST_LOCK_CONFIGS);
|
||||
}
|
||||
|
||||
public String getLockProviderClass() {
|
||||
return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME);
|
||||
@@ -2443,6 +2452,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
|
||||
writeConfig.setValue(AUTO_ADJUST_LOCK_CONFIGS, String.valueOf(autoAdjustLockConfigs));
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void setDefaults() {
|
||||
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
|
||||
// Check for mandatory properties
|
||||
@@ -2480,41 +2494,42 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
|
||||
|
||||
autoAdjustConfigsForConcurrencyMode();
|
||||
}
|
||||
|
||||
private void autoAdjustConfigsForConcurrencyMode() {
|
||||
boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
|
||||
// isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig
|
||||
final TypedProperties writeConfigProperties = writeConfig.getProps();
|
||||
final boolean isLockProviderPropertySet = writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)
|
||||
|| writeConfigProperties.containsKey(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
|
||||
|
||||
if (!isLockConfigSet) {
|
||||
HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps());
|
||||
writeConfig.setDefault(lockConfigBuilder.build());
|
||||
}
|
||||
writeConfig.setDefaultOnCondition(!isLockConfigSet,
|
||||
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
|
||||
|
||||
if (isMetadataTableEnabled) {
|
||||
// When metadata table is enabled, optimistic concurrency control must be used for
|
||||
// single writer with async table services.
|
||||
// Async table services can update the metadata table and a lock provider is
|
||||
// needed to guard against any concurrent table write operations. If user has
|
||||
// not configured any lock provider, let's use the InProcess lock provider.
|
||||
boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
|
||||
boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();
|
||||
autoAdjustConfigsForConcurrencyMode(isLockProviderPropertySet);
|
||||
}
|
||||
|
||||
if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
|
||||
// This is targeted at Single writer with async table services
|
||||
// If user does not set the lock provider, likely that the concurrency mode is not set either
|
||||
// Override the configs for metadata table
|
||||
writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||
InProcessLockProvider.class.getName());
|
||||
LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
|
||||
+ "lock provider for single writer with async table services",
|
||||
WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
|
||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
|
||||
private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) {
|
||||
if (writeConfig.isAutoAdjustLockConfigs()) {
|
||||
// auto adjustment is required only for deltastreamer and spark streaming where async table services can be executed in the same JVM.
|
||||
boolean isMetadataTableEnabled = writeConfig.getBoolean(HoodieMetadataConfig.ENABLE);
|
||||
|
||||
if (isMetadataTableEnabled) {
|
||||
// When metadata table is enabled, optimistic concurrency control must be used for
|
||||
// single writer with async table services.
|
||||
// Async table services can update the metadata table and a lock provider is
|
||||
// needed to guard against any concurrent table write operations. If user has
|
||||
// not configured any lock provider, let's use the InProcess lock provider.
|
||||
boolean areTableServicesEnabled = writeConfig.areTableServicesEnabled();
|
||||
boolean areAsyncTableServicesEnabled = writeConfig.areAnyTableServicesAsync();
|
||||
if (!isLockProviderPropertySet && areTableServicesEnabled && areAsyncTableServicesEnabled) {
|
||||
// This is targeted at Single writer with async table services
|
||||
// If user does not set the lock provider, likely that the concurrency mode is not set either
|
||||
// Override the configs for metadata table
|
||||
writeConfig.setValue(WRITE_CONCURRENCY_MODE.key(),
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||
writeConfig.setValue(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||
InProcessLockProvider.class.getName());
|
||||
LOG.info(String.format("Automatically set %s=%s and %s=%s since user has not set the "
|
||||
+ "lock provider for single writer with async table services",
|
||||
WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value(),
|
||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -136,6 +136,7 @@ public class TestHoodieWriteConfig {
|
||||
put(INLINE_COMPACT.key(), "true");
|
||||
put(AUTO_CLEAN.key(), "true");
|
||||
put(ASYNC_CLEAN.key(), "false");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||
@@ -148,6 +149,7 @@ public class TestHoodieWriteConfig {
|
||||
put(INLINE_COMPACT.key(), "true");
|
||||
put(AUTO_CLEAN.key(), "true");
|
||||
put(ASYNC_CLEAN.key(), "true");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||
HoodieFailedWritesCleaningPolicy.LAZY, inProcessLockProviderClassName);
|
||||
@@ -160,6 +162,7 @@ public class TestHoodieWriteConfig {
|
||||
put(INLINE_COMPACT.key(), "false");
|
||||
put(AUTO_CLEAN.key(), "true");
|
||||
put(ASYNC_CLEAN.key(), "false");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true,
|
||||
tableType == HoodieTableType.MERGE_ON_READ,
|
||||
@@ -181,6 +184,7 @@ public class TestHoodieWriteConfig {
|
||||
put(INLINE_COMPACT.key(), "true");
|
||||
put(AUTO_CLEAN.key(), "true");
|
||||
put(ASYNC_CLEAN.key(), "false");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), Option.of(true), Option.of(false), Option.of(true),
|
||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
@@ -188,6 +192,38 @@ public class TestHoodieWriteConfig {
|
||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(HoodieTableType.class)
|
||||
public void testAutoAdjustLockConfigs(HoodieTableType tableType) {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
|
||||
.withPath("/tmp")
|
||||
.withAutoAdjustLockConfigs(false)
|
||||
.withClusteringConfig(new HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
|
||||
.withProperties(properties)
|
||||
.build();
|
||||
|
||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||
true, true,
|
||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
HoodieFailedWritesCleaningPolicy.valueOf(FAILED_WRITES_CLEANER_POLICY.defaultValue()),
|
||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||
|
||||
writeConfig = HoodieWriteConfig.newBuilder()
|
||||
.withPath("/tmp")
|
||||
.withAutoAdjustLockConfigs(false)
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||
.withClusteringConfig(new HoodieClusteringConfig.Builder().withAsyncClustering(true).build())
|
||||
.withProperties(properties)
|
||||
.build();
|
||||
|
||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||
true, true,
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, HoodieFailedWritesCleaningPolicy.LAZY,
|
||||
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(HoodieTableType.class)
|
||||
public void testAutoConcurrencyConfigAdjustmentWithUserConfigs(HoodieTableType tableType) {
|
||||
@@ -199,8 +235,10 @@ public class TestHoodieWriteConfig {
|
||||
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||
.build())
|
||||
.withAutoAdjustLockConfigs(true)
|
||||
.withProperties(properties)
|
||||
.build();
|
||||
|
||||
verifyConcurrencyControlRelatedConfigs(writeConfig,
|
||||
true, tableType == HoodieTableType.MERGE_ON_READ,
|
||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
@@ -217,6 +255,7 @@ public class TestHoodieWriteConfig {
|
||||
put(ASYNC_CLEAN.key(), "true");
|
||||
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||
ZookeeperBasedLockProvider.class.getName());
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true, true,
|
||||
WriteConcurrencyMode.valueOf(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
@@ -227,6 +266,7 @@ public class TestHoodieWriteConfig {
|
||||
writeConfig = createWriteConfig(new HashMap<String, String>() {
|
||||
{
|
||||
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
});
|
||||
if (writeConfig.areAnyTableServicesAsync()) {
|
||||
@@ -252,6 +292,7 @@ public class TestHoodieWriteConfig {
|
||||
{
|
||||
put(HoodieTableConfig.TYPE.key(), tableType.name());
|
||||
put(TABLE_SERVICES_ENABLED.key(), "false");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
||||
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
@@ -268,6 +309,7 @@ public class TestHoodieWriteConfig {
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||
FileSystemBasedLockProviderTestClass.class.getName());
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), false, tableType == HoodieTableType.MERGE_ON_READ,
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||
@@ -288,6 +330,7 @@ public class TestHoodieWriteConfig {
|
||||
put(INLINE_COMPACT.key(), "true");
|
||||
put(AUTO_CLEAN.key(), "true");
|
||||
put(ASYNC_CLEAN.key(), "false");
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true, true,
|
||||
WriteConcurrencyMode.fromValue(WRITE_CONCURRENCY_MODE.defaultValue()),
|
||||
@@ -306,6 +349,7 @@ public class TestHoodieWriteConfig {
|
||||
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
|
||||
put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
|
||||
FileSystemBasedLockProviderTestClass.class.getName());
|
||||
put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
}
|
||||
}), true, true, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL,
|
||||
HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
|
||||
|
||||
@@ -81,7 +81,9 @@ class HoodieStreamingSink(sqlContext: SQLContext,
|
||||
// Override to use direct markers. In Structured streaming, timeline server is closed after
|
||||
// first micro-batch and subsequent micro-batches do not have timeline server running.
|
||||
// Thus, we can't use timeline-server-based markers.
|
||||
val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
|
||||
var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
|
||||
// we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
|
||||
updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
|
||||
|
||||
retry(retryCnt, retryIntervalMs)(
|
||||
Try(
|
||||
|
||||
@@ -43,8 +43,8 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
|
||||
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
||||
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
|
||||
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
||||
import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor;
|
||||
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
|
||||
@@ -272,6 +272,7 @@ public class UtilHelpers {
|
||||
sparkConf.set("spark.eventLog.overwrite", "true");
|
||||
sparkConf.set("spark.eventLog.enabled", "true");
|
||||
}
|
||||
sparkConf.set("spark.ui.port", "8090");
|
||||
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
|
||||
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
|
||||
|
||||
@@ -35,6 +35,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
@@ -127,7 +128,6 @@ public class HoodieDeltaStreamer implements Serializable {
|
||||
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
|
||||
Option<TypedProperties> propsOverride) throws IOException {
|
||||
this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration());
|
||||
|
||||
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
|
||||
InitialCheckPointProvider checkPointProvider =
|
||||
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, this.properties);
|
||||
@@ -156,7 +156,14 @@ public class HoodieDeltaStreamer implements Serializable {
|
||||
hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps());
|
||||
}
|
||||
|
||||
// set any configs that Deltastreamer has to override explicitly
|
||||
hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
|
||||
// we need auto adjustment enabled for deltastreamer since async table services are feasible within the same JVM.
|
||||
hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
|
||||
if (cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
||||
// Explicitly set the table type
|
||||
hoodieConfig.setValue(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
|
||||
}
|
||||
|
||||
return hoodieConfig.getProps(true);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user