diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java index f3614a64b..08cbd568d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java @@ -196,6 +196,12 @@ public class DFSPropertiesConfiguration { return globalProps; } + // test only + public static TypedProperties addToGlobalProps(String key, String value) { + GLOBAL_PROPS.put(key, value); + return GLOBAL_PROPS; + } + public TypedProperties getProps() { return new TypedProperties(hoodieConfig.getProps()); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index feeb57212..844e171b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties} +import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig @@ -768,13 +768,14 @@ object DataSourceOptionsHelper { def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = { // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, // or else use query type from QUERY_TYPE. - val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + val paramsWithGlobalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ parameters + val queryType = paramsWithGlobalProps.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) - .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue())) + .getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue())) Map( QUERY_TYPE.key -> queryType - ) ++ translateConfigurations(parameters) + ) ++ translateConfigurations(paramsWithGlobalProps) } def inferKeyGenClazz(props: TypedProperties): String = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala index ac3c49efd..90d073494 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} @@ -60,20 +61,21 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter { | ) """.stripMargin) - // First merge with a extra input field 'flag' (insert a new record) - spark.sql( - s""" - | merge into $tableName - | using ( - | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag, $partitionVal as year - | ) s0 - | on s0.id = $tableName.id - | when matched and flag = '1' then update set - | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, year = s0.year - | when not matched and flag = '1' then insert * - """.stripMargin) + // First insert a new record + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, $partitionVal)") + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Then insert another new record + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, $partitionVal)") + checkAnswer(s"select id, name, price, ts, year from $tableName")( - Seq(1, "a1", 10.0, 1000, partitionVal) + Seq(1, "a1", 10.0, 1000, partitionVal), + Seq(2, "a2", 10.0, 1000, partitionVal) ) // By default, Spark DML would set table type to COW and use Hive style partitioning, here we @@ -85,6 +87,15 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter { s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME, HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType) + // Manually pass incremental configs to global configs to make sure Hudi query is able to load the + // global configs + DFSPropertiesConfiguration.addToGlobalProps(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL) + DFSPropertiesConfiguration.addToGlobalProps(BEGIN_INSTANTTIME.key, firstCommit) + spark.catalog.refreshTable(tableName) + checkAnswer(s"select id, name, price, ts, year from $tableName")( + Seq(2, "a2", 10.0, 1000, partitionVal) + ) + // delete the record spark.sql(s"delete from $tableName where year = $partitionVal") val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()