1
0

[HUDI-2362] Add external config file support (#3416)

Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
wenningd
2021-11-18 01:59:26 -08:00
committed by GitHub
parent 8772cec4bd
commit 24def0b30d
25 changed files with 426 additions and 102 deletions

View File

@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.
# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false

View File

@@ -360,7 +360,7 @@ public class SparkMain {
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
: UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getProps(true);
properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

View File

@@ -18,14 +18,20 @@
package org.apache.hudi.common.config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
@@ -43,72 +49,110 @@ public class DFSPropertiesConfiguration {
private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class);
public static final String DEFAULT_PROPERTIES_FILE = "hudi-defaults.conf";
public static final String CONF_FILE_DIR_ENV_NAME = "HUDI_CONF_DIR";
public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";
// props read from hudi-defaults.conf
private static TypedProperties GLOBAL_PROPS = loadGlobalProps();
private final FileSystem fs;
private final Path rootFile;
private Path currentFilePath;
private final TypedProperties props;
// props read from user defined configuration file or input stream
private final HoodieConfig hoodieConfig;
// Keep track of files visited, to detect loops
private final Set<String> visitedFiles;
private final Set<String> visitedFilePaths;
public DFSPropertiesConfiguration(FileSystem fs, Path rootFile, TypedProperties defaults) {
public DFSPropertiesConfiguration(FileSystem fs, Path filePath) {
this.fs = fs;
this.rootFile = rootFile;
this.props = defaults;
this.visitedFiles = new HashSet<>();
visitFile(rootFile);
}
public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
this(fs, rootFile, new TypedProperties());
this.currentFilePath = filePath;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
addPropsFromFile(filePath);
}
public DFSPropertiesConfiguration() {
this.fs = null;
this.rootFile = null;
this.props = new TypedProperties();
this.visitedFiles = new HashSet<>();
this.currentFilePath = null;
this.hoodieConfig = new HoodieConfig();
this.visitedFilePaths = new HashSet<>();
}
private String[] splitProperty(String line) {
int ind = line.indexOf('=');
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
}
private void visitFile(Path file) {
try {
if (visitedFiles.contains(file.getName())) {
throw new IllegalStateException("Loop detected; file " + file + " already referenced");
/**
* Load global props from hudi-defaults.conf which is under CONF_FILE_DIR_ENV_NAME.
* @return Typed Properties
*/
public static TypedProperties loadGlobalProps() {
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration();
Option<Path> defaultConfPath = getConfPathFromEnv();
if (defaultConfPath.isPresent()) {
conf.addPropsFromFile(defaultConfPath.get());
} else {
try {
conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR));
} catch (Exception ignored) {
LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR);
}
visitedFiles.add(file.getName());
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file)));
addProperties(reader);
}
return conf.getProps();
}
public static void refreshGlobalProps() {
GLOBAL_PROPS = loadGlobalProps();
}
public static void clearGlobalProps() {
GLOBAL_PROPS = new TypedProperties();
}
/**
* Add properties from external configuration files.
*
* @param filePath File path for configuration file
*/
public void addPropsFromFile(Path filePath) {
if (visitedFilePaths.contains(filePath.toString())) {
throw new IllegalStateException("Loop detected; file " + filePath + " already referenced");
}
FileSystem fileSystem;
try {
fileSystem = fs != null ? fs : filePath.getFileSystem(new Configuration());
} catch (IOException e) {
throw new IllegalArgumentException("Cannot get the file system from file path", e);
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(filePath)))) {
visitedFilePaths.add(filePath.toString());
currentFilePath = filePath;
addPropsFromStream(reader);
} catch (IOException ioe) {
LOG.error("Error reading in properies from dfs", ioe);
LOG.error("Error reading in properties from dfs");
throw new IllegalArgumentException("Cannot read properties from dfs", ioe);
}
}
/**
* Add properties from input stream.
*
* Add properties from buffered reader.
*
* @param reader Buffered Reader
* @throws IOException
*/
public void addProperties(BufferedReader reader) throws IOException {
public void addPropsFromStream(BufferedReader reader) throws IOException {
try {
reader.lines().forEach(line -> {
if (line.startsWith("#") || line.equals("") || !line.contains("=")) {
if (!isValidLine(line)) {
return;
}
String[] split = splitProperty(line);
if (line.startsWith("include=") || line.startsWith("include =")) {
visitFile(new Path(rootFile.getParent(), split[1]));
Path includeFilePath = new Path(currentFilePath.getParent(), split[1]);
addPropsFromFile(includeFilePath);
} else {
props.setProperty(split[0], split[1]);
hoodieConfig.setValue(split[0], split[1]);
}
});
@@ -117,7 +161,46 @@ public class DFSPropertiesConfiguration {
}
}
public TypedProperties getConfig() {
return props;
public static TypedProperties getGlobalProps() {
final TypedProperties globalProps = new TypedProperties();
globalProps.putAll(GLOBAL_PROPS);
return globalProps;
}
public TypedProperties getProps() {
return new TypedProperties(hoodieConfig.getProps());
}
public TypedProperties getProps(boolean includeGlobalProps) {
return new TypedProperties(hoodieConfig.getProps(includeGlobalProps));
}
private static Option<Path> getConfPathFromEnv() {
String confDir = System.getenv(CONF_FILE_DIR_ENV_NAME);
if (confDir == null) {
LOG.warn("Cannot find " + CONF_FILE_DIR_ENV_NAME + ", please set it as the dir of " + DEFAULT_PROPERTIES_FILE);
return Option.empty();
}
if (StringUtils.isNullOrEmpty(URI.create(confDir).getScheme())) {
confDir = "file://" + confDir;
}
return Option.of(new Path(confDir + File.separator + DEFAULT_PROPERTIES_FILE));
}
private String[] splitProperty(String line) {
line = line.replaceAll("\\s+"," ");
String delimiter = line.contains("=") ? "=" : " ";
int ind = line.indexOf(delimiter);
String k = line.substring(0, ind).trim();
String v = line.substring(ind + 1).trim();
return new String[] {k, v};
}
private boolean isValidLine(String line) {
ValidationUtils.checkArgument(line != null, "passed line is null");
if (line.startsWith("#") || line.equals("")) {
return false;
}
return line.contains("=") || line.matches(".*\\s.*");
}
}

View File

@@ -58,6 +58,14 @@ public class HoodieConfig implements Serializable {
props.setProperty(cfg.key(), val);
}
public <T> void setValue(String key, String val) {
props.setProperty(key, val);
}
public void setAll(Properties properties) {
props.putAll(properties);
}
public <T> void setDefaultValue(ConfigProperty<T> configProperty) {
if (!contains(configProperty)) {
Option<T> inferValue = Option.empty();
@@ -167,7 +175,17 @@ public class HoodieConfig implements Serializable {
}
public Properties getProps() {
return props;
return getProps(false);
}
public Properties getProps(boolean includeGlobalProps) {
if (includeGlobalProps) {
Properties mergedProps = DFSPropertiesConfiguration.getGlobalProps();
mergedProps.putAll(props);
return mergedProps;
} else {
return props;
}
}
public void setDefaultOnCondition(boolean condition, HoodieConfig config) {

View File

@@ -25,10 +25,14 @@ import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Rule;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
@@ -47,6 +51,10 @@ public class TestDFSPropertiesConfiguration {
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
@Rule
public static final EnvironmentVariables ENVIRONMENT_VARIABLES
= new EnvironmentVariables();
@BeforeAll
public static void initClass() throws Exception {
hdfsTestService = new HdfsTestService();
@@ -73,12 +81,17 @@ public class TestDFSPropertiesConfiguration {
}
@AfterAll
public static void cleanupClass() throws Exception {
public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}
@AfterEach
public void cleanupGlobalConfig() {
DFSPropertiesConfiguration.clearGlobalProps();
}
private static void writePropertiesFile(Path path, String[] lines) throws IOException {
PrintStream out = new PrintStream(dfs.create(path, true));
for (String line : lines) {
@@ -91,7 +104,7 @@ public class TestDFSPropertiesConfiguration {
@Test
public void testParsing() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t1.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();
assertEquals(5, props.size());
assertThrows(IllegalArgumentException.class, () -> {
props.getString("invalid.key");
@@ -119,7 +132,7 @@ public class TestDFSPropertiesConfiguration {
@Test
public void testIncludes() {
DFSPropertiesConfiguration cfg = new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t3.props"));
TypedProperties props = cfg.getConfig();
TypedProperties props = cfg.getProps();
assertEquals(123, props.getInteger("int.prop"));
assertEquals(243.4, props.getDouble("double.prop"), 0.001);
@@ -127,7 +140,30 @@ public class TestDFSPropertiesConfiguration {
assertEquals("t3.value", props.getString("string.prop"));
assertEquals(1354354354, props.getLong("long.prop"));
assertThrows(IllegalStateException.class, () -> {
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/t4.props"));
cfg.addPropsFromFile(new Path(dfsBasePath + "/t4.props"));
}, "Should error out on a self-included file.");
}
@Test
public void testNoGlobalConfFileConfigured() {
ENVIRONMENT_VARIABLES.clear(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME);
// Should not throw any exception when no external configuration file configured
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(0, DFSPropertiesConfiguration.getGlobalProps().size());
}
@Test
public void testLoadGlobalConfFile() {
// set HUDI_CONF_DIR
String testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath();
ENVIRONMENT_VARIABLES.set(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath);
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
}
}

View File

@@ -0,0 +1,26 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.
# Example:
hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
hoodie.datasource.hive_sync.use_jdbc true
hoodie.datasource.hive_sync.support_timestamp false
hoodie.index.type BLOOM
hoodie.metadata.enable true

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.streamer;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
@@ -38,8 +40,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.Properties;
/**
* A utility which can incrementally consume data from Kafka and apply it to the target table.
* It has the similar functionality with SQL data source except that the source is bind to Kafka
@@ -65,7 +65,8 @@ public class HoodieFlinkStreamer {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
// Read from kafka source
RowType rowType =

View File

@@ -103,7 +103,7 @@ public class StreamerUtil {
}
return readConfig(
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
new Path(cfg.propsFilePath), cfg.configs).getConfig();
new Path(cfg.propsFilePath), cfg.configs).getProps();
}
public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
@@ -128,18 +128,11 @@ public class StreamerUtil {
* Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option).
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
try {
conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
} catch (Exception e) {
conf = new DFSPropertiesConfiguration();
LOG.warn("Unexpected error read props file at :" + cfgPath, e);
}
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);

View File

@@ -103,7 +103,7 @@ public class HoodieTestSuiteJob {
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps();
log.info("Creating workload generator with configs : {}", props.toString());
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);

View File

@@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.SparkSession
@@ -47,6 +47,7 @@ object HoodieWriterUtils {
* @return
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
Map(OPERATION.key -> OPERATION.defaultValue,
TABLE_TYPE.key -> TABLE_TYPE.defaultValue,
PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue,
@@ -81,7 +82,7 @@ object HoodieWriterUtils {
ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue,
RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString,
DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
) ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
def toProperties(params: Map[String, String]): TypedProperties = {
@@ -170,4 +171,23 @@ object HoodieWriterUtils {
}
}
}
val sparkDatasourceConfigsToTableConfigsMap = Map(
TABLE_NAME -> HoodieTableConfig.NAME,
TABLE_TYPE -> HoodieTableConfig.TYPE,
PRECOMBINE_FIELD -> HoodieTableConfig.PRECOMBINE_FIELD,
PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS,
RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS,
PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME
)
def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, String]): Map[String, String] = {
val includingTableConfigs = scala.collection.mutable.Map() ++ options
sparkDatasourceConfigsToTableConfigsMap.foreach(kv => {
if (options.containsKey(kv._1.key)) {
includingTableConfigs(kv._2.key) = options(kv._1.key)
includingTableConfigs.remove(kv._1.key)
}
})
includingTableConfigs.toMap
}
}

View File

@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
@@ -273,7 +274,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
*/
def withSparkConf(spark: SparkSession, options: Map[String, String])
(baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ // Table options has the highest priority
baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
.filterKeys(_.startsWith("hoodie."))
}

View File

@@ -43,7 +43,7 @@ class AlterHoodieTableRenameCommand(
.setConf(hadoopConf).build()
// Init table with new name.
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(metaClient.getTableConfig.getProps)
.fromProperties(metaClient.getTableConfig.getProps(true))
.setTableName(newName.table)
.initTable(hadoopConf, path)
// Call AlterTableRenameCommand#run to rename table in meta.

View File

@@ -19,9 +19,9 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.hadoop.HoodieParquetInputFormat
@@ -47,7 +47,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}
import java.util.{Locale, Properties}
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
@@ -100,7 +100,10 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
// if CTAS, we treat the table we just created as nonexistent
val isTableExists = if (ctas) false else tableExistsInPath(path, conf)
var existingTableConfig = Map.empty[String, String]
val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(tblProperties)
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalSqlProps = HoodieOptionConfig.mappingTableConfigToSqlOption(
HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(globalProps))
val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlProps ++ tblProperties)
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties)
// get final schema and parameters
@@ -341,7 +344,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue()
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue
}
if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
@@ -374,7 +377,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
// Save all the table config to the hoodie.properties.
val parameters = originTableConfig ++ tableOptions
val parameters = HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(originTableConfig ++ tableOptions)
val properties = new Properties()
properties.putAll(parameters.asJava)
HoodieTableMetaClient.withPropertyBuilder()

View File

@@ -17,7 +17,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME

View File

@@ -45,7 +45,7 @@ class TruncateHoodieTableCommand(
// Create MetaClient
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(hadoopConf).build()
Some(metaClient.getTableConfig.getProps)
Some(metaClient.getTableConfig.getProps(true))
} else {
None
}

View File

@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.
# Example:
hoodie.datasource.write.table.type MERGE_ON_READ
hoodie.datasource.write.hive_style_partitioning false

View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import java.io.File
import java.nio.file.{Files, Paths}
import org.scalatest.BeforeAndAfter
class TestSqlConf extends TestHoodieSqlBase with BeforeAndAfter {
def setEnv(key: String, value: String): String = {
val field = System.getenv().getClass.getDeclaredField("m")
field.setAccessible(true)
val map = field.get(System.getenv()).asInstanceOf[java.util.Map[java.lang.String, java.lang.String]]
map.put(key, value)
}
test("Test Hudi Conf") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = tmp.getCanonicalPath
val partitionVal = "2021"
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| year string
|) using hudi
| partitioned by (year)
| location '$tablePath'
| options (
| primaryKey ='id',
| preCombineField = 'ts'
| )
""".stripMargin)
// First merge with a extra input field 'flag' (insert a new record)
spark.sql(
s"""
| merge into $tableName
| using (
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag, $partitionVal as year
| ) s0
| on s0.id = $tableName.id
| when matched and flag = '1' then update set
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, year = s0.year
| when not matched and flag = '1' then insert *
""".stripMargin)
checkAnswer(s"select id, name, price, ts, year from $tableName")(
Seq(1, "a1", 10.0, 1000, partitionVal)
)
// By default, Spark DML would set table type to COW and use Hive style partitioning, here we
// set table type to MOR and disable Hive style partitioning in the hudi conf file, and check
// if Hudi DML can load these configs correctly
assertResult(true)(Files.exists(Paths.get(s"$tablePath/$partitionVal")))
assertResult(HoodieTableType.MERGE_ON_READ)(new HoodieTableConfig(
new Path(tablePath).getFileSystem(new Configuration),
s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType)
// delete the record
spark.sql(s"delete from $tableName where year = $partitionVal")
val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()
assertResult(0)(cnt)
}
}
before {
val testPropsFilePath = new File("src/test/resources/external-config").getAbsolutePath
setEnv(DFSPropertiesConfiguration.CONF_FILE_DIR_ENV_NAME, testPropsFilePath)
DFSPropertiesConfiguration.refreshGlobalProps()
}
after {
DFSPropertiesConfiguration.clearGlobalProps()
}
}

View File

@@ -112,7 +112,7 @@ public class HDFSParquetImporter implements Serializable {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {

View File

@@ -63,7 +63,7 @@ public class HoodieCleaner {
*/
FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getProps(true);
LOG.info("Creating Cleaner with configs : " + props.toString());
}

View File

@@ -69,7 +69,7 @@ public class HoodieClusteringJob {
return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig();
.getProps(true);
}
public static class Config implements Serializable {

View File

@@ -59,7 +59,7 @@ public class HoodieCompactor {
return UtilHelpers
.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig();
.getProps(true);
}
public static class Config implements Serializable {

View File

@@ -161,18 +161,11 @@ public class UtilHelpers {
*
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
DFSPropertiesConfiguration conf;
try {
conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
} catch (Exception e) {
conf = new DFSPropertiesConfiguration();
LOG.warn("Unexpected error read props file at :" + cfgPath, e);
}
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(fs, cfgPath);
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
@@ -186,7 +179,7 @@ public class UtilHelpers {
try {
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
@@ -196,7 +189,7 @@ public class UtilHelpers {
}
public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = new TypedProperties();
TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
props.forEach(x -> {
String[] kv = x.split("=");
ValidationUtils.checkArgument(kv.length == 2);

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
@@ -121,15 +122,18 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Option<TypedProperties> props) throws IOException {
// Resolving the properties first in a consistent way
HoodieConfig hoodieConfig = new HoodieConfig();
if (props.isPresent()) {
this.properties = setDefaults(props.get());
hoodieConfig.setAll(props.get());
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
this.properties = setDefaults(UtilHelpers.getConfig(cfg.configs).getConfig());
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
this.properties = setDefaults(UtilHelpers.readConfig(
hoodieConfig.setAll(UtilHelpers.readConfig(
FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
new Path(cfg.propsFilePath), cfg.configs).getConfig());
new Path(cfg.propsFilePath), cfg.configs).getProps());
}
hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
this.properties = (TypedProperties) hoodieConfig.getProps(true);
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
InitialCheckPointProvider checkPointProvider =
@@ -148,13 +152,6 @@ public class HoodieDeltaStreamer implements Serializable {
deltaSyncService.ifPresent(ds -> ds.shutdown(false));
}
private TypedProperties setDefaults(TypedProperties props) {
if (!props.containsKey(DataSourceWriteOptions.RECONCILE_SCHEMA().key())) {
props.setProperty(DataSourceWriteOptions.RECONCILE_SCHEMA().key(), DataSourceWriteOptions.RECONCILE_SCHEMA().defaultValue().toString());
}
return props;
}
/**
* Main method to start syncing.
*
@@ -370,12 +367,12 @@ public class HoodieDeltaStreamer implements Serializable {
}
public boolean isAsyncClusteringEnabled() {
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps()
.getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), false)));
}
public boolean isInlineClusteringEnabled() {
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getProps()
.getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING.key(), false)));
}

View File

@@ -77,7 +77,7 @@ public class HoodieMultiTableDeltaStreamer {
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig();
TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getProps();
//get the tables to be ingested and their corresponding config files from this properties instance
populateTableExecutionContextList(commonProperties, configFolder, fs, config);
}
@@ -116,7 +116,7 @@ public class HoodieMultiTableDeltaStreamer {
String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig();
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getProps();
properties.forEach((k, v) -> {
if (tableProperties.get(k) == null) {
tableProperties.setProperty(k.toString(), v.toString());

View File

@@ -365,7 +365,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
@Test
public void testProps() {
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key", props.getString("hoodie.datasource.write.recordkey.field"));
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
@@ -485,7 +485,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getConfig();
new DFSPropertiesConfiguration(dfs, new Path(dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs