[HUDI-3764] Allow loading external configs while querying Hudi tables with Spark (#4915)
Currently when doing Hudi queries w/ Spark, it won't load the external configurations. Say if customers enabled metadata listing in their global config file, then this would let them actually query w/o metadata feature enabled. This PR fixes this issue and allows loading global configs during the Hudi reading phase. Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -196,6 +196,12 @@ public class DFSPropertiesConfiguration {
|
|||||||
return globalProps;
|
return globalProps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test only
|
||||||
|
public static TypedProperties addToGlobalProps(String key, String value) {
|
||||||
|
GLOBAL_PROPS.put(key, value);
|
||||||
|
return GLOBAL_PROPS;
|
||||||
|
}
|
||||||
|
|
||||||
public TypedProperties getProps() {
|
public TypedProperties getProps() {
|
||||||
return new TypedProperties(hoodieConfig.getProps());
|
return new TypedProperties(hoodieConfig.getProps());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
|
||||||
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
||||||
import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties}
|
import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
|
||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig
|
||||||
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
|
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig
|
import org.apache.hudi.common.table.HoodieTableConfig
|
||||||
@@ -768,13 +768,14 @@ object DataSourceOptionsHelper {
|
|||||||
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
|
def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
|
||||||
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
|
// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
|
||||||
// or else use query type from QUERY_TYPE.
|
// or else use query type from QUERY_TYPE.
|
||||||
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
|
val paramsWithGlobalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ parameters
|
||||||
|
val queryType = paramsWithGlobalProps.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
|
||||||
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||||
.getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
|
.getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
|
||||||
|
|
||||||
Map(
|
Map(
|
||||||
QUERY_TYPE.key -> queryType
|
QUERY_TYPE.key -> queryType
|
||||||
) ++ translateConfigurations(parameters)
|
) ++ translateConfigurations(paramsWithGlobalProps)
|
||||||
}
|
}
|
||||||
|
|
||||||
def inferKeyGenClazz(props: TypedProperties): String = {
|
def inferKeyGenClazz(props: TypedProperties): String = {
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
import org.apache.hudi.DataSourceReadOptions._
|
||||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
|
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
|
||||||
@@ -60,20 +61,21 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
|
|||||||
| )
|
| )
|
||||||
""".stripMargin)
|
""".stripMargin)
|
||||||
|
|
||||||
// First merge with a extra input field 'flag' (insert a new record)
|
// First insert a new record
|
||||||
spark.sql(
|
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, $partitionVal)")
|
||||||
s"""
|
|
||||||
| merge into $tableName
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
| using (
|
.setBasePath(tablePath)
|
||||||
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag, $partitionVal as year
|
.setConf(spark.sessionState.newHadoopConf())
|
||||||
| ) s0
|
.build()
|
||||||
| on s0.id = $tableName.id
|
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
|
||||||
| when matched and flag = '1' then update set
|
|
||||||
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, year = s0.year
|
// Then insert another new record
|
||||||
| when not matched and flag = '1' then insert *
|
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, $partitionVal)")
|
||||||
""".stripMargin)
|
|
||||||
checkAnswer(s"select id, name, price, ts, year from $tableName")(
|
checkAnswer(s"select id, name, price, ts, year from $tableName")(
|
||||||
Seq(1, "a1", 10.0, 1000, partitionVal)
|
Seq(1, "a1", 10.0, 1000, partitionVal),
|
||||||
|
Seq(2, "a2", 10.0, 1000, partitionVal)
|
||||||
)
|
)
|
||||||
|
|
||||||
// By default, Spark DML would set table type to COW and use Hive style partitioning, here we
|
// By default, Spark DML would set table type to COW and use Hive style partitioning, here we
|
||||||
@@ -85,6 +87,15 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
|
|||||||
s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
|
s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
|
||||||
HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType)
|
HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType)
|
||||||
|
|
||||||
|
// Manually pass incremental configs to global configs to make sure Hudi query is able to load the
|
||||||
|
// global configs
|
||||||
|
DFSPropertiesConfiguration.addToGlobalProps(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
DFSPropertiesConfiguration.addToGlobalProps(BEGIN_INSTANTTIME.key, firstCommit)
|
||||||
|
spark.catalog.refreshTable(tableName)
|
||||||
|
checkAnswer(s"select id, name, price, ts, year from $tableName")(
|
||||||
|
Seq(2, "a2", 10.0, 1000, partitionVal)
|
||||||
|
)
|
||||||
|
|
||||||
// delete the record
|
// delete the record
|
||||||
spark.sql(s"delete from $tableName where year = $partitionVal")
|
spark.sql(s"delete from $tableName where year = $partitionVal")
|
||||||
val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()
|
val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()
|
||||||
|
|||||||
Reference in New Issue
Block a user