From 49407169ac1711c0babb77fb3014473c0b236edd Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Thu, 22 Oct 2020 08:49:28 +0800 Subject: [PATCH] [HUDI-1209] Properties File must be optional when running deltastreamer (#2085) --- .../org/apache/hudi/utilities/UtilHelpers.java | 14 ++++++++++++++ .../deltastreamer/HoodieDeltaStreamer.java | 17 ++++++++++++----- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index e76790918..45171b388 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -175,6 +175,20 @@ public class UtilHelpers { return conf; } + public static DFSPropertiesConfiguration getConfig(List overriddenProps) { + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(); + try { + if (!overriddenProps.isEmpty()) { + LOG.info("Adding overridden properties to file properties."); + conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); + } + } catch (IOException ioe) { + throw new HoodieIOException("Unexpected error adding config overrides", ioe); + } + + return conf; + } + public static TypedProperties buildProperties(List props) { TypedProperties properties = new TypedProperties(); props.forEach(x -> { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 002ead385..a6de17d81 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -114,9 +114,15 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option props) throws IOException { // Resolving the properties first in a consistent way - this.properties = props.isPresent() ? props.get() : UtilHelpers.readConfig( - FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), - new Path(cfg.propsFilePath), cfg.configs).getConfig(); + if (props.isPresent()) { + this.properties = props.get(); + } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { + this.properties = UtilHelpers.getConfig(cfg.configs).getConfig(); + } else { + this.properties = UtilHelpers.readConfig( + FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), + new Path(cfg.propsFilePath), cfg.configs).getConfig(); + } if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = @@ -199,6 +205,8 @@ public class HoodieDeltaStreamer implements Serializable { } public static class Config implements Serializable { + public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir") + + "/src/test/resources/delta-streamer-config/dfs-source.properties"; @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie table. " @@ -221,8 +229,7 @@ public class HoodieDeltaStreamer implements Serializable { + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" + "to individual classes, for supported properties." + " Properties in this file can be overridden by \"--hoodie-conf\"") - public String propsFilePath = - "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties"; + public String propsFilePath = DEFAULT_DFS_SOURCE_PROPERTIES; @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",