diff --git a/conf/hudi-defaults.conf.template b/conf/hudi-defaults.conf.template new file mode 100644 index 000000000..175dbaf23 --- /dev/null +++ b/conf/hudi-defaults.conf.template @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default system properties included when running Hudi jobs. +# This is useful for setting default environmental settings. + +# Example: +# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 +# hoodie.datasource.hive_sync.use_jdbc true +# hoodie.datasource.hive_sync.support_timestamp false +# hoodie.index.type BLOOM +# hoodie.metadata.enable false diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 82688fecc..ef6416c0c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -360,7 +360,7 @@ public class SparkMain { String payloadClassName, String enableHiveSync, String propsFilePath, List configs) throws IOException { TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs) - : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig(); + : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true); properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java index f97e4add4..88335db13 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java @@ -18,14 +18,20 @@ package org.apache.hudi.common.config; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.net.URI; import java.util.HashSet; import java.util.Set; @@ -43,72 +49,110 @@ public class DFSPropertiesConfiguration { private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class); + public static final String DEFAULT_PROPERTIES_FILE = "hudi-defaults.conf"; + + public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR"; + + public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf"; + + // props read from hudi-defaults.conf + private static TypedProperties GLOBAL_PROPS = loadGlobalProps(); + private final FileSystem fs; - private final Path rootFile; + private Path currentFilePath; - private final TypedProperties props; + // props read from user defined configuration file or input stream + private final HoodieConfig hoodieConfig; // Keep track of files visited, to detect loops - private final Set visitedFiles; + private final Set visitedFilePaths; - public DFSPropertiesConfiguration(FileSystem fs, Path rootFile, TypedProperties defaults) { + public DFSPropertiesConfiguration(FileSystem fs, Path filePath) { this.fs = fs; - this.rootFile = rootFile; - this.props = defaults; - this.visitedFiles = new HashSet<>(); - visitFile(rootFile); - } - - public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) { - this(fs, rootFile, new TypedProperties()); + this.currentFilePath = filePath; + this.hoodieConfig = new HoodieConfig(); + this.visitedFilePaths = new HashSet<>(); + addPropsFromFile(filePath); } public DFSPropertiesConfiguration() { this.fs = null; - this.rootFile = null; - this.props = new TypedProperties(); - this.visitedFiles = new HashSet<>(); + this.currentFilePath = null; + this.hoodieConfig = new HoodieConfig(); + this.visitedFilePaths = new HashSet<>(); } - private String[] splitProperty(String line) { - int ind = line.indexOf('='); - String k = line.substring(0, ind).trim(); - String v = line.substring(ind + 1).trim(); - return new String[] {k, v}; - } - - private void visitFile(Path file) { - try { - if (visitedFiles.contains(file.getName())) { - throw new IllegalStateException("Loop detected; file " + file + " already referenced"); + /** + * Load global props from hudi-defaults.conf which is under CONF_FILE_DIR_ENV_NAME. + * @return Typed Properties + */ + public static TypedProperties loadGlobalProps() { + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(); + Option defaultConfPath = getConfPathFromEnv(); + if (defaultConfPath.isPresent()) { + conf.addPropsFromFile(defaultConfPath.get()); + } else { + try { + conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR)); + } catch (Exception ignored) { + LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR); } - visitedFiles.add(file.getName()); - BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file))); - addProperties(reader); + } + return conf.getProps(); + } + + public static void refreshGlobalProps() { + GLOBAL_PROPS = loadGlobalProps(); + } + + public static void clearGlobalProps() { + GLOBAL_PROPS = new TypedProperties(); + } + + /** + * Add properties from external configuration files. + * + * @param filePath File path for configuration file + */ + public void addPropsFromFile(Path filePath) { + if (visitedFilePaths.contains(filePath.toString())) { + throw new IllegalStateException("Loop detected; file " + filePath + " already referenced"); + } + FileSystem fileSystem; + try { + fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration()); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot get the file system from file path", e); + } + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) { + visitedFilePaths.add(filePath.toString()); + currentFilePath = filePath; + addPropsFromStream(reader); } catch (IOException ioe) { - LOG.error("Error reading in properies from dfs", ioe); + LOG.error("Error reading in properties from dfs"); throw new IllegalArgumentException("Cannot read properties from dfs", ioe); } } /** - * Add properties from input stream. - * + * Add properties from buffered reader. + * * @param reader Buffered Reader * @throws IOException */ - public void addProperties(BufferedReader reader) throws IOException { + public void addPropsFromStream(BufferedReader reader) throws IOException { try { reader.lines().forEach(line -> { - if (line.startsWith("#") || line.equals("") || !line.contains("=")) { + if (!isValidLine(line)) { return; } String[] split = splitProperty(line); if (line.startsWith("include=") || line.startsWith("include =")) { - visitFile(new Path(rootFile.getParent(), split[1])); + Path includeFilePath = new Path(currentFilePath.getParent(), split[1]); + addPropsFromFile(includeFilePath); } else { - props.setProperty(split[0], split[1]); + hoodieConfig.setValue(split[0], split[1]); } }); @@ -117,7 +161,46 @@ public class DFSPropertiesConfiguration { } } - public TypedProperties getConfig() { - return props; + public static TypedProperties getGlobalProps() { + final TypedProperties globalProps = new TypedProperties(); + globalProps.putAll(GLOBAL_PROPS); + return globalProps; + } + + public TypedProperties getProps() { + return new TypedProperties(hoodieConfig.getProps()); + } + + public TypedProperties getProps(boolean includeGlobalProps) { + return new TypedProperties(hoodieConfig.getProps(includeGlobalProps)); + } + + private static Option getConfPathFromEnv() { + String confDir = System.getenv(CONF_FILE_DIR_ENV_NAME); + if (confDir == null) { + LOG.warn("Cannot find " + CONF_FILE_DIR_ENV_NAME + ", please set it as the dir of " + DEFAULT_PROPERTIES_FILE); + return Option.empty(); + } + if (StringUtils.isNullOrEmpty(URI.create(confDir).getScheme())) { + confDir = "file://" + confDir; + } + return Option.of(new Path(confDir + File.separator + DEFAULT_PROPERTIES_FILE)); + } + + private String[] splitProperty(String line) { + line = line.replaceAll("\\s+"," "); + String delimiter = line.contains("=") ? "=" : " "; + int ind = line.indexOf(delimiter); + String k = line.substring(0, ind).trim(); + String v = line.substring(ind + 1).trim(); + return new String[] {k, v}; + } + + private boolean isValidLine(String line) { + ValidationUtils.checkArgument(line != null, "passed line is null"); + if (line.startsWith("#") || line.equals("")) { + return false; + } + return line.contains("=") || line.matches(".*\\s.*"); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index ed2b90eea..6ae1ba6e3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -58,6 +58,14 @@ public class HoodieConfig implements Serializable { props.setProperty(cfg.key(), val); } + public void setValue(String key, String val) { + props.setProperty(key, val); + } + + public void setAll(Properties properties) { + props.putAll(properties); + } + public void setDefaultValue(ConfigProperty configProperty) { if (!contains(configProperty)) { Option inferValue = Option.empty(); @@ -167,7 +175,17 @@ public class HoodieConfig implements Serializable { } public Properties getProps() { - return props; + return getProps(false); + } + + public Properties getProps(boolean includeGlobalProps) { + if (includeGlobalProps) { + Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps(); + mergedProps.putAll(props); + return mergedProps; + } else { + return props; + } } public void setDefaultOnCondition(boolean condition, HoodieConfig config) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java index ddce3216b..b7ea8bff5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java @@ -25,10 +25,14 @@ import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Rule; +import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.io.File; import java.io.IOException; import java.io.PrintStream; @@ -47,6 +51,10 @@ public class TestDFSPropertiesConfiguration { private static MiniDFSCluster dfsCluster; private static DistributedFileSystem dfs; + @Rule + public static final EnvironmentVariables ENVIRONMENT_VARIABLES + = new EnvironmentVariables(); + @BeforeAll public static void initClass() throws Exception { hdfsTestService = new HdfsTestService(); @@ -73,12 +81,17 @@ public class TestDFSPropertiesConfiguration { } @AfterAll - public static void cleanupClass() throws Exception { + public static void cleanupClass() { if (hdfsTestService != null) { hdfsTestService.stop(); } } + @AfterEach + public void cleanupGlobalConfig() { + DFSPropertiesConfiguration.clearGlobalProps(); + } + private static void writePropertiesFile(Path path, String[] lines) throws IOException { PrintStream out = new PrintStream(dfs.create(path, true)); for (String line : lines) { @@ -91,7 +104,7 @@ public class TestDFSPropertiesConfiguration { @Test public void testParsing() { DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props")); - TypedProperties props = cfg.getConfig(); + TypedProperties props = cfg.getProps(); assertEquals(5, props.size()); assertThrows(IllegalArgumentException.class, () -> { props.getString("invalid.key"); @@ -119,7 +132,7 @@ public class TestDFSPropertiesConfiguration { @Test public void testIncludes() { DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props")); - TypedProperties props = cfg.getConfig(); + TypedProperties props = cfg.getProps(); assertEquals(123, props.getInteger("int.prop")); assertEquals(243.4, props.getDouble("double.prop"), 0.001); @@ -127,7 +140,30 @@ public class TestDFSPropertiesConfiguration { assertEquals("t3.value", props.getString("string.prop")); assertEquals(1354354354, props.getLong("long.prop")); assertThrows(IllegalStateException.class, () -> { - new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t4.props")); + cfg.addPropsFromFile(new Path(dfsBasePath + "/t4.props")); }, "Should error out on a self-included file."); } + + @Test + public void testNoGlobalConfFileConfigured() { + ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME); + // Should not throw any exception when no external configuration file configured + DFSPropertiesConfiguration.refreshGlobalProps(); + assertEquals(0, DFSPropertiesConfiguration.getGlobalProps().size()); + } + + @Test + public void testLoadGlobalConfFile() { + // set HUDI_CONF_DIR + String testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath(); + ENVIRONMENT_VARIABLES.set(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath); + + DFSPropertiesConfiguration.refreshGlobalProps(); + assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size()); + assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl")); + assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc")); + assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp")); + assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type")); + assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable")); + } } diff --git a/hudi-common/src/test/resources/external-config/hudi-defaults.conf b/hudi-common/src/test/resources/external-config/hudi-defaults.conf new file mode 100644 index 000000000..1133adb4d --- /dev/null +++ b/hudi-common/src/test/resources/external-config/hudi-defaults.conf @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default system properties included when running Hudi jobs. +# This is useful for setting default environmental settings. + +# Example: +hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 +hoodie.datasource.hive_sync.use_jdbc true +hoodie.datasource.hive_sync.support_timestamp false +hoodie.index.type BLOOM +hoodie.metadata.enable true diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 3e567f31f..851931f0d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -18,6 +18,8 @@ package org.apache.hudi.streamer; +import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; @@ -38,8 +40,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; -import java.util.Properties; - /** * A utility which can incrementally consume data from Kafka and apply it to the target table. * It has the similar functionality with SQL data source except that the source is bind to Kafka @@ -65,7 +65,8 @@ public class HoodieFlinkStreamer { env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); } - Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg); + TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); + kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg)); // Read from kafka source RowType rowType = diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index ddbd24e35..4b9a51610 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -103,7 +103,7 @@ public class StreamerUtil { } return readConfig( FSUtils.getFs(cfg.propsFilePath, getHadoopConf()), - new Path(cfg.propsFilePath), cfg.configs).getConfig(); + new Path(cfg.propsFilePath), cfg.configs).getProps(); } public static Schema getSourceSchema(FlinkStreamerConfig cfg) { @@ -128,18 +128,11 @@ public class StreamerUtil { * Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option). */ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { - DFSPropertiesConfiguration conf; - try { - conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); - } catch (Exception e) { - conf = new DFSPropertiesConfiguration(); - LOG.warn("Unexpected error read props file at :" + cfgPath, e); - } - + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath); try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); - conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); + conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); } } catch (IOException ioe) { throw new HoodieIOException("Unexpected error adding config overrides", ioe); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index d8ed649d9..8e0e3eb5d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -103,7 +103,7 @@ public class HoodieTestSuiteJob { cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate(); this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration()); - this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(); log.info("Creating workload generator with configs : {}", props.toString()); this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index c1223d979..c89c19dde 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE -import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.SparkSession @@ -47,6 +47,7 @@ object HoodieWriterUtils { * @return */ def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = { + val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala Map(OPERATION.key -> OPERATION.defaultValue, TABLE_TYPE.key -> TABLE_TYPE.defaultValue, PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue, @@ -81,7 +82,7 @@ object HoodieWriterUtils { ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue, RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString, DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue - ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) + ) ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) } def toProperties(params: Map[String, String]): TypedProperties = { @@ -170,4 +171,23 @@ object HoodieWriterUtils { } } } + + val sparkDatasourceConfigsToTableConfigsMap = Map( + TABLE_NAME -> HoodieTableConfig.NAME, + TABLE_TYPE -> HoodieTableConfig.TYPE, + PRECOMBINE_FIELD -> HoodieTableConfig.PRECOMBINE_FIELD, + PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS, + RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS, + PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME + ) + def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, String]): Map[String, String] = { + val includingTableConfigs = scala.collection.mutable.Map() ++ options + sparkDatasourceConfigsToTableConfigsMap.foreach(kv => { + if (options.containsKey(kv._1.key)) { + includingTableConfigs(kv._2.key) = options(kv._1.key) + includingTableConfigs.remove(kv._1.key) + } + }) + includingTableConfigs.toMap + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 8e4903350..50c9e539c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord @@ -273,7 +274,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { */ def withSparkConf(spark: SparkSession, options: Map[String, String]) (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { - baseConfig ++ // Table options has the highest priority + baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) .filterKeys(_.startsWith("hoodie.")) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala index 2df9ec869..ec71a9d3c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala @@ -43,7 +43,7 @@ class AlterHoodieTableRenameCommand( .setConf(hadoopConf).build() // Init table with new name. HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(metaClient.getTableConfig.getProps) + .fromProperties(metaClient.getTableConfig.getProps(true)) .setTableName(newName.table) .initTable(hadoopConf, path) // Call AlterTableRenameCommand#run to rename table in meta. diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 94c6eab75..e449a91a1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - -import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} +import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport} import org.apache.hudi.HoodieWriterUtils._ +import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.hadoop.HoodieParquetInputFormat @@ -47,7 +47,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkConf} import java.util.{Locale, Properties} - +import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal @@ -100,7 +100,10 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean // if CTAS, we treat the table we just created as nonexistent val isTableExists = if (ctas) false else tableExistsInPath(path, conf) var existingTableConfig = Map.empty[String, String] - val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(tblProperties) + val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap + val globalSqlProps = HoodieOptionConfig.mappingTableConfigToSqlOption( + HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(globalProps)) + val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlProps ++ tblProperties) val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties) // get final schema and parameters @@ -341,7 +344,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } } else { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue() + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue } if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { @@ -374,7 +377,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) // Save all the table config to the hoodie.properties. - val parameters = originTableConfig ++ tableOptions + val parameters = HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(originTableConfig ++ tableOptions) val properties = new Properties() properties.putAll(parameters.asJava) HoodieTableMetaClient.withPropertyBuilder() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index e8acebd78..98cc4dd72 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 339f4b52c..a57a3d1bd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -45,7 +45,7 @@ class TruncateHoodieTableCommand( // Create MetaClient val metaClient = HoodieTableMetaClient.builder().setBasePath(path) .setConf(hadoopConf).build() - Some(metaClient.getTableConfig.getProps) + Some(metaClient.getTableConfig.getProps(true)) } else { None } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf b/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf new file mode 100644 index 000000000..c883b5bbe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/external-config/hudi-defaults.conf @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default system properties included when running Hudi jobs. +# This is useful for setting default environmental settings. + +# Example: +hoodie.datasource.write.table.type MERGE_ON_READ +hoodie.datasource.write.hive_style_partitioning false diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala new file mode 100644 index 000000000..1a8ac0e64 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.DFSPropertiesConfiguration +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} + +import java.io.File +import java.nio.file.{Files, Paths} + +import org.scalatest.BeforeAndAfter + +class TestSqlConf extends TestHoodieSqlBase with BeforeAndAfter { + + def setEnv(key: String, value: String): String = { + val field = System.getenv().getClass.getDeclaredField("m") + field.setAccessible(true) + val map = field.get(System.getenv()).asInstanceOf[java.util.Map[java.lang.String, java.lang.String]] + map.put(key, value) + } + + test("Test Hudi Conf") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + val partitionVal = "2021" + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | year string + |) using hudi + | partitioned by (year) + | location '$tablePath' + | options ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // First merge with a extra input field 'flag' (insert a new record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag, $partitionVal as year + | ) s0 + | on s0.id = $tableName.id + | when matched and flag = '1' then update set + | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, year = s0.year + | when not matched and flag = '1' then insert * + """.stripMargin) + checkAnswer(s"select id, name, price, ts, year from $tableName")( + Seq(1, "a1", 10.0, 1000, partitionVal) + ) + + // By default, Spark DML would set table type to COW and use Hive style partitioning, here we + // set table type to MOR and disable Hive style partitioning in the hudi conf file, and check + // if Hudi DML can load these configs correctly + assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal"))) + assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig( + new Path(tablePath).getFileSystem(new Configuration), + s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME, + HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType) + + // delete the record + spark.sql(s"delete from $tableName where year = $partitionVal") + val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count() + assertResult(0)(cnt) + } + } + + before { + val testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath + setEnv(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath) + DFSPropertiesConfiguration.refreshGlobalProps() + } + + after { + DFSPropertiesConfiguration.clearGlobalProps() + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 5f9b199fe..21393a01b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -112,7 +112,7 @@ public class HDFSParquetImporter implements Serializable { public int dataImport(JavaSparkContext jsc, int retry) { this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Starting data import with configs : " + props.toString()); int ret = -1; try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 24e2828a5..28d484c56 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -63,7 +63,7 @@ public class HoodieCleaner { */ FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) - : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true); LOG.info("Creating Cleaner with configs : " + props.toString()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 2bd9ab43a..a96a4b75e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -69,7 +69,7 @@ public class HoodieClusteringJob { return UtilHelpers .readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) - .getConfig(); + .getProps(true); } public static class Config implements Serializable { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 1996fb8b5..74a4ea59f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -59,7 +59,7 @@ public class HoodieCompactor { return UtilHelpers .readConfig(fs, new Path(cfg.propsFilePath), cfg.configs) - .getConfig(); + .getProps(true); } public static class Config implements Serializable { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 3771a7d34..1d74aced5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -161,18 +161,11 @@ public class UtilHelpers { * */ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List overriddenProps) { - DFSPropertiesConfiguration conf; - try { - conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); - } catch (Exception e) { - conf = new DFSPropertiesConfiguration(); - LOG.warn("Unexpected error read props file at :" + cfgPath, e); - } - + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath); try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); - conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); + conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); } } catch (IOException ioe) { throw new HoodieIOException("Unexpected error adding config overrides", ioe); @@ -186,7 +179,7 @@ public class UtilHelpers { try { if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); - conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); + conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); } } catch (IOException ioe) { throw new HoodieIOException("Unexpected error adding config overrides", ioe); @@ -196,7 +189,7 @@ public class UtilHelpers { } public static TypedProperties buildProperties(List props) { - TypedProperties properties = new TypedProperties(); + TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps(); props.forEach(x -> { String[] kv = x.split("="); ValidationUtils.checkArgument(kv.length == 2); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 1649759f7..408a71484 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -29,6 +29,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; @@ -121,15 +122,18 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option props) throws IOException { // Resolving the properties first in a consistent way + HoodieConfig hoodieConfig = new HoodieConfig(); if (props.isPresent()) { - this.properties = setDefaults(props.get()); + hoodieConfig.setAll(props.get()); } else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) { - this.properties = setDefaults(UtilHelpers.getConfig(cfg.configs).getConfig()); + hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps()); } else { - this.properties = setDefaults(UtilHelpers.readConfig( + hoodieConfig.setAll(UtilHelpers.readConfig( FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), - new Path(cfg.propsFilePath), cfg.configs).getConfig()); + new Path(cfg.propsFilePath), cfg.configs).getProps()); } + hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA()); + this.properties = (TypedProperties) hoodieConfig.getProps(true); if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { InitialCheckPointProvider checkPointProvider = @@ -148,13 +152,6 @@ public class HoodieDeltaStreamer implements Serializable { deltaSyncService.ifPresent(ds -> ds.shutdown(false)); } - private TypedProperties setDefaults(TypedProperties props) { - if (!props.containsKey(DataSourceWriteOptions.RECONCILE_SCHEMA().key())) { - props.setProperty(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), DataSourceWriteOptions.RECONCILE_SCHEMA().defaultValue().toString()); - } - return props; - } - /** * Main method to start syncing. * @@ -370,12 +367,12 @@ public class HoodieDeltaStreamer implements Serializable { } public boolean isAsyncClusteringEnabled() { - return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps() .getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), false))); } public boolean isInlineClusteringEnabled() { - return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig() + return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps() .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING.key(), false))); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 7e49d9b88..2b788dc6d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -77,7 +77,7 @@ public class HoodieMultiTableDeltaStreamer { FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); - TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig(); + TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getProps(); //get the tables to be ingested and their corresponding config files from this properties instance populateTableExecutionContextList(commonProperties, configFolder, fs, config); } @@ -116,7 +116,7 @@ public class HoodieMultiTableDeltaStreamer { String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX; String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable)); checkIfTableConfigFileExists(configFolder, fs, configFilePath); - TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig(); + TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getProps(); properties.forEach((k, v) -> { if (tableProperties.get(k) == null) { tableProperties.setProperty(k.toString(), v.toString()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 014a0c140..9cf040cea 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -365,7 +365,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { @Test public void testProps() { TypedProperties props = - new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); + new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism")); assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field")); assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator", @@ -485,7 +485,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT); TypedProperties props = - new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig(); + new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath); cfg.initialCheckpointProvider = checkpointProviderClass; // create regular kafka connect hdfs dirs