1
0

[HUDI-2840] Fixed DeltaStreaemer to properly respect configuration passed t/h properties file (#4090)

* Rebased `DFSPropertiesConfiguration` to access Hadoop config in liue of FS to avoid confusion

* Fixed `readConfig` to take Hadoop's `Configuration` instead of FS;
Fixing usages

* Added test for local FS access

* Rebase to use `FSUtils.getFs`

* Combine properties provided as a file along w/ overrides provided from the CLI

* Added helper utilities to `HoodieClusteringConfig`;
Make sure corresponding config methods fallback to defaults;

* Fixed DeltaStreamer usage to respect properly combined configuration;
Abstracted `HoodieClusteringConfig.from` convenience utility to init Clustering config from `Properties`

* Tidying up

* `lint`

* Reverting changes to `HoodieWriteConfig`

* Tdiying up

* Fixed incorrect merge of the props

* Converted `HoodieConfig` to wrap around `Properties` into `TypedProperties`

* Fixed compilation

* Fixed compilation
This commit is contained in:
Alexey Kudinkin
2021-11-25 14:48:22 -08:00
committed by GitHub
parent e0125a7911
commit 6f5d8d04cd
19 changed files with 168 additions and 103 deletions

View File

@@ -112,7 +112,7 @@ public class HDFSParquetImporter implements Serializable {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
: UtilHelpers.readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {

View File

@@ -18,16 +18,13 @@
package org.apache.hudi.utilities;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -61,9 +58,8 @@ public class HoodieCleaner {
/*
* Filesystem used.
*/
FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
: UtilHelpers.readConfig(jssc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Creating Cleaner with configs : " + props.toString());
}

View File

@@ -73,10 +73,7 @@ public class HoodieClusteringJob {
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

View File

@@ -62,10 +62,7 @@ public class HoodieCompactor {
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -157,11 +158,8 @@ public class UtilHelpers {
}
}
/**
*
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath);
public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");

View File

@@ -172,6 +172,8 @@ public class DeltaSync implements Serializable {
/**
* Bag of properties with source, hoodie client, key generator etc.
*
* NOTE: These properties are already consolidated w/ CLI provided config-overrides
*/
private final TypedProperties props;
@@ -698,22 +700,33 @@ public class DeltaSync implements Serializable {
private HoodieWriteConfig getHoodieClientConfig(Schema schema) {
final boolean combineBeforeUpsert = true;
final boolean autoCommit = false;
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode. otherwise enabled for MOR
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(cfg.isInlineClusteringEnabled())
.withAsyncClustering(cfg.isAsyncClusteringEnabled()).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)
.withAutoCommit(autoCommit).withProps(props);
if (null != schema) {
builder = builder.withSchema(schema.toString());
// NOTE: Provided that we're injecting combined properties
// (from {@code props}, including CLI overrides), there's no
// need to explicitly set up some configuration aspects that
// are based on these (for ex Clustering configuration)
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withPath(cfg.targetBasePath)
.combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(cfg.payloadClassName)
.withInlineCompaction(cfg.isInlineCompactionEnabled())
.build()
)
.withPayloadConfig(
HoodiePayloadConfig.newBuilder()
.withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)
.withAutoCommit(autoCommit)
.withProps(props);
if (schema != null) {
builder.withSchema(schema.toString());
}
HoodieWriteConfig config = builder.build();
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
@@ -721,13 +734,15 @@ public class DeltaSync implements Serializable {
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
}
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props);
// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled()));
ValidationUtils.checkArgument(config.inlineClusteringEnabled() == cfg.isInlineClusteringEnabled(),
String.format("%s should be set to %s", INLINE_CLUSTERING.key(), cfg.isInlineClusteringEnabled()));
ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == cfg.isAsyncClusteringEnabled(),
String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), cfg.isAsyncClusteringEnabled()));
ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(),
String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled()));
ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(),
String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
String.format("%s should be set to %s", AUTO_COMMIT_ENABLE.key(), autoCommit));
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,

View File

@@ -99,6 +99,9 @@ public class HoodieDeltaStreamer implements Serializable {
protected final transient Config cfg;
/**
* NOTE: These properties are already consolidated w/ CLI provided config-overrides.
*/
private final TypedProperties properties;
protected transient Option<DeltaSyncService> deltaSyncService;
@@ -122,20 +125,8 @@ public class HoodieDeltaStreamer implements Serializable {
}
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Option<TypedProperties> props) throws IOException {
// Resolving the properties first in a consistent way
HoodieConfig hoodieConfig = new HoodieConfig();
if (props.isPresent()) {
hoodieConfig.setAll(props.get());
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
hoodieConfig.setAll(UtilHelpers.readConfig(
FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
new Path(cfg.propsFilePath), cfg.configs).getProps());
}
hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
this.properties = (TypedProperties) hoodieConfig.getProps(true);
Option<TypedProperties> propsOverride) throws IOException {
this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration());
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
@@ -143,6 +134,7 @@ public class HoodieDeltaStreamer implements Serializable {
checkPointProvider.init(conf);
cfg.checkpoint = checkPointProvider.getCheckpoint();
}
this.cfg = cfg;
this.bootstrapExecutor = Option.ofNullable(
cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null);
@@ -150,6 +142,25 @@ public class HoodieDeltaStreamer implements Serializable {
cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, Option.ofNullable(this.properties)));
}
private static TypedProperties combineProperties(Config cfg, Option<TypedProperties> propsOverride, Configuration hadoopConf) {
HoodieConfig hoodieConfig = new HoodieConfig();
// Resolving the properties in a consistent way:
// 1. Properties override always takes precedence
// 2. Otherwise, check if there's no props file specified (merging in CLI overrides)
// 3. Otherwise, parse provided specified props file (merging in CLI overrides)
if (propsOverride.isPresent()) {
hoodieConfig.setAll(propsOverride.get());
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps());
}
hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
return hoodieConfig.getProps(true);
}
public void shutdownGracefully() {
deltaSyncService.ifPresent(ds -> ds.shutdown(false));
}
@@ -364,20 +375,11 @@ public class HoodieDeltaStreamer implements Serializable {
}
public boolean isInlineCompactionEnabled() {
// Inline compaction is disabled for continuous mode, otherwise enabled for MOR
return !continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}
public boolean isAsyncClusteringEnabled() {
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps()
.getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), false)));
}
public boolean isInlineClusteringEnabled() {
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps()
.getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING.key(), false)));
}
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -626,6 +628,8 @@ public class HoodieDeltaStreamer implements Serializable {
LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME);
}
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props);
try {
while (!isShutdownRequested()) {
try {
@@ -637,7 +641,7 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get()));
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
}
if (cfg.isAsyncClusteringEnabled()) {
if (clusteringConfig.isAsyncClusteringEnabled()) {
Option<String> clusteringInstant = deltaSync.getClusteringInstantOpt();
if (clusteringInstant.isPresent()) {
LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get());
@@ -727,7 +731,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
// start async clustering if required
if (cfg.isAsyncClusteringEnabled()) {
if (HoodieClusteringConfig.from(props).isAsyncClusteringEnabled()) {
if (asyncClusteringService.isPresent()) {
asyncClusteringService.get().updateWriteClient(writeClient);
} else {

View File

@@ -77,7 +77,7 @@ public class HoodieMultiTableDeltaStreamer {
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getProps();
TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), new Path(commonPropsFile), new ArrayList<String>()).getProps();
//get the tables to be ingested and their corresponding config files from this properties instance
populateTableExecutionContextList(commonProperties, configFolder, fs, config);
}
@@ -116,7 +116,7 @@ public class HoodieMultiTableDeltaStreamer {
String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getProps();
TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
properties.forEach((k, v) -> {
if (tableProperties.get(k) == null) {
tableProperties.setProperty(k.toString(), v.toString());

View File

@@ -378,7 +378,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
@Test
public void testProps() {
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
@@ -498,7 +498,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs