1
0

[HUDI-1209] Properties File must be optional when running deltastreamer (#2085)

This commit is contained in:
Shen Hong
2020-10-22 08:49:28 +08:00
committed by GitHub
parent e4931744eb
commit 49407169ac
2 changed files with 26 additions and 5 deletions

View File

@@ -175,6 +175,20 @@ public class UtilHelpers {
return conf; return conf;
} }
public static DFSPropertiesConfiguration getConfig(List<String> overriddenProps) {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
}
return conf;
}
public static TypedProperties buildProperties(List<String> props) { public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = new TypedProperties(); TypedProperties properties = new TypedProperties();
props.forEach(x -> { props.forEach(x -> {

View File

@@ -114,9 +114,15 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Option<TypedProperties> props) throws IOException { Option<TypedProperties> props) throws IOException {
// Resolving the properties first in a consistent way // Resolving the properties first in a consistent way
this.properties = props.isPresent() ? props.get() : UtilHelpers.readConfig( if (props.isPresent()) {
this.properties = props.get();
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
this.properties = UtilHelpers.getConfig(cfg.configs).getConfig();
} else {
this.properties = UtilHelpers.readConfig(
FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
new Path(cfg.propsFilePath), cfg.configs).getConfig(); new Path(cfg.propsFilePath), cfg.configs).getConfig();
}
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider = InitialCheckPointProvider checkPointProvider =
@@ -199,6 +205,8 @@ public class HoodieDeltaStreamer implements Serializable {
} }
public static class Config implements Serializable { public static class Config implements Serializable {
public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir")
+ "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--target-base-path"}, @Parameter(names = {"--target-base-path"},
description = "base path for the target hoodie table. " description = "base path for the target hoodie table. "
@@ -221,8 +229,7 @@ public class HoodieDeltaStreamer implements Serializable {
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties." + "to individual classes, for supported properties."
+ " Properties in this file can be overridden by \"--hoodie-conf\"") + " Properties in this file can be overridden by \"--hoodie-conf\"")
public String propsFilePath = public String propsFilePath = DEFAULT_DFS_SOURCE_PROPERTIES;
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",