1
0

[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration (#5737)

There are multiple issues with our current DataSource V2 integrations: b/c we advertise Hudi tables as V2, Spark expects it to implement certain APIs which are not implemented at the moment, instead we're using custom Resolution rule (in HoodieSpark3Analysis) to instead manually fallback to V1 APIs.  This commit fixes the issue by reverting DSv2 APIs and making Spark use V1, except for schema evaluation logic.
This commit is contained in:
Alexey Kudinkin
2022-06-07 16:30:46 -07:00
committed by GitHub
parent 1349b596a1
commit 35afdb4316
28 changed files with 374 additions and 256 deletions

View File

@@ -175,11 +175,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to " .withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+ "implementations of evolution of schema"); + "implementations of evolution of schema");
public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("enable full schema evolution for hoodie");
public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty
.key("hoodie.schema.cache.enable") .key("hoodie.schema.cache.enable")
.defaultValue(false) .defaultValue(false)
@@ -929,11 +924,11 @@ public class HoodieWriteConfig extends HoodieConfig {
} }
public boolean getSchemaEvolutionEnable() { public boolean getSchemaEvolutionEnable() {
return getBoolean(SCHEMA_EVOLUTION_ENABLE); return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE);
} }
public void setSchemaEvolutionEnable(boolean enable) { public void setSchemaEvolutionEnable(boolean enable) {
setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
} }
/** /**
@@ -2175,7 +2170,7 @@ public class HoodieWriteConfig extends HoodieConfig {
} }
public Builder withSchemaEvolutionEnable(boolean enable) { public Builder withSchemaEvolutionEnable(boolean enable) {
writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); writeConfig.setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
return this; return this;
} }

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util
/**
* Utility allowing for seamless conversion b/w Java/Scala functional primitives
*/
object JFunction {
def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)
def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}
}

View File

@@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession} import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import java.util.Locale import java.util.Locale
@@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable {
maxSplitBytes: Long): Seq[FilePartition] maxSplitBytes: Long): Seq[FilePartition]
def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
tripAlias(table) match { unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation => case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark) isHoodieTable(toTableIdentifier(relation), spark)
case _=> false case _=> false
@@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table) isHoodieTable(table)
} }
def tripAlias(plan: LogicalPlan): LogicalPlan = { protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
plan match { plan match {
case SubqueryAlias(_, relation: LogicalPlan) => case SubqueryAlias(_, relation: LogicalPlan) =>
tripAlias(relation) unfoldSubqueryAliases(relation)
case other => case other =>
other other
} }
} }
/**
* Create customresolutionRule to deal with alter command for hudi.
*/
def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan]
/** /**
* Create instance of [[ParquetFileFormat]] * Create instance of [[ParquetFileFormat]]
*/ */

View File

@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -73,12 +74,14 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService; import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.util.JFunction;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -98,6 +101,7 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -145,6 +149,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
FileSystem.closeAll(); FileSystem.closeAll();
} }
protected Option<Consumer<SparkSessionExtensions>> getSparkSessionExtensionsInjector() {
return Option.empty();
}
@BeforeEach @BeforeEach
public void setTestMethodName(TestInfo testInfo) { public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) { if (testInfo.getTestMethod().isPresent()) {
@@ -186,16 +194,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
* @param appName The specified application name. * @param appName The specified application name.
*/ */
protected void initSparkContexts(String appName) { protected void initSparkContexts(String appName) {
Option<Consumer<SparkSessionExtensions>> sparkSessionExtensionsInjector =
getSparkSessionExtensionsInjector();
if (sparkSessionExtensionsInjector.isPresent()) {
// In case we need to inject extensions into Spark Session, we have
// to stop any session that might still be active and since Spark will try
// to re-use it
HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
.ifPresent(SparkSession::stop);
}
// Initialize a local spark env // Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName)); jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName));
jsc.setLogLevel("ERROR"); jsc.setLogLevel("ERROR");
hadoopConf = jsc.hadoopConfiguration();
// SQLContext stuff hadoopConf = jsc.hadoopConfiguration();
sqlContext = new SQLContext(jsc);
context = new HoodieSparkEngineContext(jsc); context = new HoodieSparkEngineContext(jsc);
hadoopConf = context.getHadoopConf().get();
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); sparkSession = SparkSession.builder()
.withExtensions(JFunction.toScala(sparkSessionExtensions -> {
sparkSessionExtensionsInjector.ifPresent(injector -> injector.accept(sparkSessionExtensions));
return null;
}))
.config(jsc.getConf())
.getOrCreate();
sqlContext = new SQLContext(sparkSession);
} }
/** /**

View File

@@ -31,6 +31,11 @@ import java.util.Properties;
description = "The following set of configurations are common across Hudi.") description = "The following set of configurations are common across Hudi.")
public class HoodieCommonConfig extends HoodieConfig { public class HoodieCommonConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty
.key("hoodie.schema.on.read.enable")
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type") .key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)

View File

@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig} import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.HoodieTableConfig
@@ -142,6 +142,9 @@ object DataSourceReadOptions {
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable") .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false") .defaultValue("false")
.withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.") .withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")
val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */ /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated @Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()

View File

@@ -25,11 +25,10 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.log4j.LogManager import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.streaming.OutputMode
@@ -100,9 +99,18 @@ class DefaultSource extends RelationProvider
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType val tableType = metaClient.getTableType
val queryType = parameters(QUERY_TYPE.key) val queryType = parameters(QUERY_TYPE.key)
val userSchema = if (schema == null) Option.empty[StructType] else Some(schema) // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
// case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
// from the table itself
val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
None
} else {
Option(schema)
}
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, metaClient) new EmptyRelation(sqlContext, metaClient)
} else { } else {

View File

@@ -34,7 +34,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
@@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String,
abstract class HoodieBaseRelation(val sqlContext: SQLContext, abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient, val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String], val optParams: Map[String, String],
userSchema: Option[StructType]) schemaSpec: Option[StructType])
extends BaseRelation extends BaseRelation
with FileRelation with FileRelation
with PrunedFilteredScan with PrunedFilteredScan
@@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/ */
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaResolver = new TableSchemaResolver(metaClient) val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaResolver.getTableAvroSchema) match { val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema case Success(schema) => schema
case Failure(e) => case Failure(e) =>
logWarning("Failed to fetch schema from the table", e) logError("Failed to fetch schema from the table", e)
// If there is no commit in the table, we can't get the schema throw new HoodieSchemaException("Failed to fetch schema from the table")
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => convertToAvroSchema(s)
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
} }
} }
// try to find internalSchema
val internalSchemaFromMeta = try { val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) InternalSchema.getEmptyInternalSchema
} catch { } else {
case _: Exception => InternalSchema.getEmptyInternalSchema Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) =>
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
InternalSchema.getEmptyInternalSchema
} }
(avroSchema, internalSchemaFromMeta) }
(avroSchema, internalSchema)
} }
protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
@@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
private def prunePartitionColumns(dataStructSchema: StructType): StructType = private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
} }
object HoodieBaseRelation extends SparkAdapterSupport { object HoodieBaseRelation extends SparkAdapterSupport {

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
@@ -338,7 +338,7 @@ object HoodieSparkSqlWriter {
def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)), parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
} }
/** /**

View File

@@ -18,11 +18,10 @@
package org.apache.hudi package org.apache.hudi
import java.util.Properties import java.util.Properties
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties} import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
@@ -163,9 +162,9 @@ object HoodieWriterUtils {
// Check schema evolution for bootstrap table. // Check schema evolution for bootstrap table.
// now we do not support bootstrap table. // now we do not support bootstrap table.
if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL) if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
&& params.getOrElse(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) { && params.getOrElse(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) {
throw new HoodieException(String throw new HoodieException(String
.format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key())) .format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key()))
} }
} }

View File

@@ -253,8 +253,11 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
.filterKeys(_.startsWith("hoodie.")) .filterKeys(_.startsWith("hoodie."))
} }
def isEnableHive(sparkSession: SparkSession): Boolean = /**
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) * Checks whether Spark is using Hive as Session's Catalog
*/
def isUsingHiveCatalog(sparkSession: SparkSession): Boolean =
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
/** /**
* Convert different query instant time format to the commit time format. * Convert different query instant time format to the commit time format.

View File

@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, withSparkConf}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@@ -57,7 +57,7 @@ trait ProvidesHoodieConfig extends Logging {
require(hoodieCatalogTable.primaryKeys.nonEmpty, require(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
@@ -174,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging {
logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName") logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName")
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
withSparkConf(sparkSession, catalogProperties) { withSparkConf(sparkSession, catalogProperties) {
Map( Map(
@@ -213,7 +213,7 @@ trait ProvidesHoodieConfig extends Logging {
hoodieCatalogTable: HoodieCatalogTable, hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = { partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
val catalogProperties = hoodieCatalogTable.catalogProperties val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig val tableConfig = hoodieCatalogTable.tableConfig
@@ -259,7 +259,7 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
val options = hoodieCatalogTable.catalogProperties val options = hoodieCatalogTable.catalogProperties
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
withSparkConf(sparkSession, options) { withSparkConf(sparkSession, options) {
Map( Map(

View File

@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@@ -144,7 +144,7 @@ object CreateHoodieTableCommand {
) )
// Create table in the catalog // Create table in the catalog
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
if (enableHive) { if (enableHive) {
createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists) createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists)
} else { } else {

View File

@@ -33,17 +33,13 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
new HoodieCommonSqlParser(session, parser) new HoodieCommonSqlParser(session, parser)
} }
HoodieAnalysis.customResolutionRules().foreach { rule => HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
extensions.injectResolutionRule { session => extensions.injectResolutionRule { session =>
rule(session) ruleBuilder(session)
} }
} }
extensions.injectResolutionRule { session => HoodieAnalysis.customPostHocResolutionRules.foreach { rule =>
sparkAdapter.createResolveHudiAlterTableCommand(session)
}
HoodieAnalysis.customPostHocResolutionRules().foreach { rule =>
extensions.injectPostHocResolutionRule { session => extensions.injectPostHocResolutionRule { session =>
rule(session) rule(session)
} }

View File

@@ -39,45 +39,69 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import java.util import java.util
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
object HoodieAnalysis { object HoodieAnalysis {
def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = type RuleBuilder = SparkSession => Rule[LogicalPlan]
Seq(
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
session => HoodieResolveReferences(session), session => HoodieResolveReferences(session),
session => HoodieAnalysis(session) session => HoodieAnalysis(session)
) ++ extraResolutionRules() )
def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Seq(
session => HoodiePostAnalysisRule(session)
) ++ extraPostHocResolutionRules()
def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
if (HoodieSparkUtils.gteqSpark3_2) { if (HoodieSparkUtils.gteqSpark3_2) {
val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
val dataSourceV2ToV1Fallback: RuleBuilder =
session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]]
val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
val spark3Analysis: SparkSession => Rule[LogicalPlan] = val spark3Analysis: RuleBuilder =
session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]
val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
val spark3References: SparkSession => Rule[LogicalPlan] = val spark3ResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]] session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]
Seq(spark3Analysis, spark3References) val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
} else { val spark32ResolveAlterTableCommands: RuleBuilder =
Seq.empty session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
}
// NOTE: PLEASE READ CAREFULLY
//
// It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback
// is performed prior to other rules being evaluated
rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands)
} else if (HoodieSparkUtils.gteqSpark3_1) {
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
val spark31ResolveAlterTableCommands: RuleBuilder =
session => ReflectionUtils.loadClass(spark31ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
rules ++= Seq(spark31ResolveAlterTableCommands)
} }
def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = rules
}
def customPostHocResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
session => HoodiePostAnalysisRule(session)
)
if (HoodieSparkUtils.gteqSpark3_2) { if (HoodieSparkUtils.gteqSpark3_2) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule" val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] = val spark3PostHocResolution: RuleBuilder =
session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]] session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]]
Seq(spark3PostHocResolution) rules += spark3PostHocResolution
} else {
Seq.empty
} }
rules
}
} }
/** /**

View File

@@ -453,7 +453,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
// Enable the hive sync by default if spark have enable the hive metastore. // Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isEnableHive(sparkSession) val enableHive = isUsingHiveCatalog(sparkSession)
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map( Map(
"path" -> path, "path" -> path,

View File

@@ -18,20 +18,24 @@
package org.apache.hudi.functional package org.apache.hudi.functional
import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
import org.apache.hudi.common.util
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._ import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.joda.time.DateTime import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat import org.joda.time.format.DateTimeFormat
@@ -42,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import java.util.function.Consumer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@@ -67,6 +72,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
val verificationCol: String = "driver" val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update" val updatedVerificationVal: String = "driver_update"
override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] =
toJavaOption(
Some(
JFunction.toJava((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
)
@BeforeEach override def setUp() { @BeforeEach override def setUp() {
initPath() initPath()
initSparkContexts() initSparkContexts()

View File

@@ -25,6 +25,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
@@ -40,7 +41,10 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
dir dir
} }
TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT")) // NOTE: We have to fix the timezone to make sure all date-/timestamp-bound utilities output
// is consistent with the fixtures
DateTimeZone.setDefault(DateTimeZone.UTC)
TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
protected lazy val spark: SparkSession = SparkSession.builder() protected lazy val spark: SparkSession = SparkSession.builder()
.master("local[1]") .master("local[1]")
.appName("hoodie sql test") .appName("hoodie sql test")
@@ -50,7 +54,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
.config("hoodie.upsert.shuffle.parallelism", "4") .config("hoodie.upsert.shuffle.parallelism", "4")
.config("hoodie.delete.shuffle.parallelism", "4") .config("hoodie.delete.shuffle.parallelism", "4")
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "CTT") .config("spark.sql.session.timeZone", "UTC")
.config(sparkConf()) .config(sparkConf())
.getOrCreate() .getOrCreate()

View File

@@ -159,12 +159,6 @@ class Spark2Adapter extends SparkAdapter {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2") throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2")
} }
override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
} }

View File

@@ -112,8 +112,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
} }
override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
tripAlias(table) match { unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation => case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark) isHoodieTable(toTableIdentifier(relation), spark)
case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties()) case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())

View File

@@ -41,19 +41,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer =
new HoodieSpark3_1AvroDeserializer(rootAvroType, rootCatalystType) new HoodieSpark3_1AvroDeserializer(rootAvroType, rootCatalystType)
override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.1")) {
val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark31HoodieParquetFileFormat(appendPartitionValues)) Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
} }

View File

@@ -16,6 +16,8 @@
*/ */
package org.apache.spark.sql.hudi package org.apache.spark.sql.hudi
import org.apache.hudi.common.config.HoodieCommonConfig
import java.util.Locale import java.util.Locale
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
@@ -114,8 +116,9 @@ case class ResolveHudiAlterTableCommand312(sparkSession: SparkSession) extends R
} }
} }
private def schemaEvolutionEnabled(): Boolean = sparkSession private def schemaEvolutionEnabled(): Boolean =
.sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
private def isHoodieTable(table: CatalogTable): Boolean = table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" private def isHoodieTable(table: CatalogTable): Boolean = table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"

View File

@@ -17,19 +17,19 @@
package org.apache.hudi package org.apache.hudi
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider { /**
* NOTE: PLEASE READ CAREFULLY
* All of Spark DataSourceV2 APIs are deliberately disabled to make sure
* there are no regressions in performance
* Please check out HUDI-4178 for more details
*/
class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
override def shortName(): String = "hudi" override def shortName(): String = "hudi"
/*
def inferSchema: StructType = new StructType() def inferSchema: StructType = new StructType()
override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema
@@ -43,4 +43,5 @@ class Spark3DefaultSource extends DefaultSource with DataSourceRegister with Tab
HoodieInternalV2Table(SparkSession.active, path) HoodieInternalV2Table(SparkSession.active, path)
} }
*/
} }

View File

@@ -67,19 +67,6 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
) )
} }
override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = {
if (SPARK_VERSION.startsWith("3.2")) {
val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
} else {
new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
}
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark32HoodieParquetFileFormat(appendPartitionValues)) Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
} }

View File

@@ -17,12 +17,12 @@
package org.apache.spark.sql.hudi package org.apache.spark.sql.hudi
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, DropColumns, LogicalPlan, RenameColumn, ReplaceColumns, SetTableProperties, UnsetTableProperties} import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCommand} import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCommand}
@@ -33,33 +33,38 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom
*/ */
class ResolveHudiAlterTableCommandSpark32(sparkSession: SparkSession) extends Rule[LogicalPlan] { class ResolveHudiAlterTableCommandSpark32(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { def apply(plan: LogicalPlan): LogicalPlan = {
case set @ SetTableProperties(asTable(table), _) if schemaEvolutionEnabled && set.resolved => if (schemaEvolutionEnabled) {
HudiAlterTableCommand(table, set.changes, ColumnChangeID.PROPERTY_CHANGE) plan.resolveOperatorsUp {
case unSet @ UnsetTableProperties(asTable(table), _, _) if schemaEvolutionEnabled && unSet.resolved => case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if set.resolved =>
HudiAlterTableCommand(table, unSet.changes, ColumnChangeID.PROPERTY_CHANGE) HudiAlterTableCommand(t.v1Table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
case drop @ DropColumns(asTable(table), _) if schemaEvolutionEnabled && drop.resolved => case unSet@UnsetTableProperties(ResolvedHoodieV2TablePlan(t), _, _) if unSet.resolved =>
HudiAlterTableCommand(table, drop.changes, ColumnChangeID.DELETE) HudiAlterTableCommand(t.v1Table, unSet.changes, ColumnChangeID.PROPERTY_CHANGE)
case add @ AddColumns(asTable(table), _) if schemaEvolutionEnabled && add.resolved => case drop@DropColumns(ResolvedHoodieV2TablePlan(t), _) if drop.resolved =>
HudiAlterTableCommand(table, add.changes, ColumnChangeID.ADD) HudiAlterTableCommand(t.v1Table, drop.changes, ColumnChangeID.DELETE)
case renameColumn @ RenameColumn(asTable(table), _, _) if schemaEvolutionEnabled && renameColumn.resolved=> case add@AddColumns(ResolvedHoodieV2TablePlan(t), _) if add.resolved =>
HudiAlterTableCommand(table, renameColumn.changes, ColumnChangeID.UPDATE) HudiAlterTableCommand(t.v1Table, add.changes, ColumnChangeID.ADD)
case alter @ AlterColumn(asTable(table), _, _, _, _, _) if schemaEvolutionEnabled && alter.resolved => case renameColumn@RenameColumn(ResolvedHoodieV2TablePlan(t), _, _) if renameColumn.resolved =>
HudiAlterTableCommand(table, alter.changes, ColumnChangeID.UPDATE) HudiAlterTableCommand(t.v1Table, renameColumn.changes, ColumnChangeID.UPDATE)
case replace @ ReplaceColumns(asTable(table), _) if schemaEvolutionEnabled && replace.resolved => case alter@AlterColumn(ResolvedHoodieV2TablePlan(t), _, _, _, _, _) if alter.resolved =>
HudiAlterTableCommand(table, replace.changes, ColumnChangeID.REPLACE) HudiAlterTableCommand(t.v1Table, alter.changes, ColumnChangeID.UPDATE)
case replace@ReplaceColumns(ResolvedHoodieV2TablePlan(t), _) if replace.resolved =>
HudiAlterTableCommand(t.v1Table, replace.changes, ColumnChangeID.REPLACE)
}
} else {
plan
}
} }
private def schemaEvolutionEnabled(): Boolean = sparkSession private def schemaEvolutionEnabled: Boolean =
.sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
object asTable { object ResolvedHoodieV2TablePlan {
def unapply(a: LogicalPlan): Option[CatalogTable] = { def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
a match { plan match {
case ResolvedTable(_, _, table: HoodieInternalV2Table, _) => case ResolvedTable(_, _, v2Table: HoodieInternalV2Table, _) => Some(v2Table)
table.catalogTable case _ => None
case _ =>
None
} }
} }
} }

View File

@@ -17,72 +17,77 @@
package org.apache.spark.sql.hudi.analysis package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.{DefaultSource, SparkAdapterSupport} import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.{Table, V1Table}
import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog}
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath}
import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table} import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand} import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession} import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.JavaConverters.mapAsJavaMapConverter
/** /**
* Rule for convert the logical plan to command. * NOTE: PLEASE READ CAREFULLY
* @param sparkSession *
* Since Hudi relations don't currently implement DS V2 Read API, we have to fallback to V1 here.
* Such fallback will have considerable performance impact, therefore it's only performed in cases
* where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
*
* Check out HUDI-4178 for more details
*/ */
case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport with ProvidesHoodieConfig { with ProvidesHoodieConfig {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) => case v2r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _) =>
val output = dsv2.output val output = v2r.output
val catalogTable = if (d.catalogTable.isDefined) { val catalogTable = v2Table.catalogTable.map(_ => v2Table.v1Table)
Some(d.v1Table)
} else {
None
}
val relation = new DefaultSource().createRelation(new SQLContext(sparkSession), val relation = new DefaultSource().createRelation(new SQLContext(sparkSession),
buildHoodieConfig(d.hoodieCatalogTable)) buildHoodieConfig(v2Table.hoodieCatalogTable), v2Table.hoodieCatalogTable.tableSchema)
LogicalRelation(relation, output, catalogTable, isStreaming = false) LogicalRelation(relation, output, catalogTable, isStreaming = false)
case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, _, _) if a.query.resolved && }
r.table.isInstanceOf[HoodieInternalV2Table] && }
needsSchemaAdjustment(a.query, r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
val projection = resolveQueryColumnsByOrdinal(a.query, r.output) class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] {
if (projection != a.query) { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
a.copy(query = projection) case s @ InsertIntoStatement(r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _), partitionSpec, _, _, _, _)
if s.query.resolved && needsSchemaAdjustment(s.query, v2Table.hoodieCatalogTable.table, partitionSpec, r.schema) =>
val projection = resolveQueryColumnsByOrdinal(s.query, r.output)
if (projection != s.query) {
s.copy(query = projection)
} else { } else {
a s
} }
} }
/** /**
* Need to adjust schema based on the query and relation schema, for example, * Need to adjust schema based on the query and relation schema, for example,
* if using insert into xx select 1, 2 here need to map to column names * if using insert into xx select 1, 2 here need to map to column names
* @param query
* @param hoodieTable
* @param partitionSpec
* @param schema
* @return
*/ */
private def needsSchemaAdjustment(query: LogicalPlan, private def needsSchemaAdjustment(query: LogicalPlan,
hoodieTable: HoodieInternalV2Table, table: CatalogTable,
partitionSpec: Map[String, Option[String]], partitionSpec: Map[String, Option[String]],
schema: StructType): Boolean = { schema: StructType): Boolean = {
val output = query.output val output = query.output
val queryOutputWithoutMetaFields = removeMetaFields(output) val queryOutputWithoutMetaFields = removeMetaFields(output)
val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema
val partitionFields = hoodieCatalogTable.partitionFields
val partitionSchema = hoodieCatalogTable.partitionSchema
val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get) val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get)
assert(staticPartitionValues.isEmpty || assert(staticPartitionValues.isEmpty ||
@@ -91,8 +96,8 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[Logical
s"is: ${staticPartitionValues.mkString("," + "")}") s"is: ${staticPartitionValues.mkString("," + "")}")
assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
== hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size, == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
s"Required select columns count: ${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
s"Current select columns(including static partition column) count: " + s"Current select columns(including static partition column) count: " +
s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size}columns: " + s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size}columns: " +
s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})") s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
@@ -126,7 +131,6 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[Logical
/** /**
* Rule for resolve hoodie's extended syntax or rewrite some logical plan. * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
* @param sparkSession
*/ */
case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan] case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan]
with SparkAdapterSupport with ProvidesHoodieConfig { with SparkAdapterSupport with ProvidesHoodieConfig {
@@ -173,28 +177,26 @@ case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rul
} }
/** /**
* Rule for rewrite some spark commands to hudi's implementation. * Rule replacing resolved Spark's commands (not working for Hudi tables out-of-the-box) with
* @param sparkSession * corresponding Hudi implementations
*/ */
case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = { override def apply(plan: LogicalPlan): LogicalPlan = {
plan match { plan match {
case ShowPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specOpt, _) => case ShowPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), specOpt, _) =>
ShowHoodieTablePartitionsCommand( ShowHoodieTablePartitionsCommand(
idt.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec)) id.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec))
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand // Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTable(ResolvedTable(_, idt, _: HoodieInternalV2Table, _)) => case TruncateTable(ResolvedTable(_, id, HoodieV1OrV2Table(_), _)) =>
TruncateHoodieTableCommand(idt.asTableIdentifier, None) TruncateHoodieTableCommand(id.asTableIdentifier, None)
case TruncatePartition( case TruncatePartition(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), partitionSpec: UnresolvedPartitionSpec) =>
ResolvedTable(_, idt, _: HoodieInternalV2Table, _), TruncateHoodieTableCommand(id.asTableIdentifier, Some(partitionSpec.spec))
partitionSpec: UnresolvedPartitionSpec) =>
TruncateHoodieTableCommand(idt.asTableIdentifier, Some(partitionSpec.spec))
case DropPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specs, ifExists, purge) => case DropPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), specs, ifExists, purge) =>
AlterHoodieTableDropPartitionCommand( AlterHoodieTableDropPartitionCommand(
idt.asTableIdentifier, id.asTableIdentifier,
specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec), specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec),
ifExists, ifExists,
purge, purge,
@@ -205,3 +207,12 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule
} }
} }
} }
private[sql] object HoodieV1OrV2Table extends SparkAdapterSupport {
def unapply(table: Table): Option[CatalogTable] = table match {
case V1Table(catalogTable) if sparkAdapter.isHoodieTable(catalogTable) => Some(catalogTable)
case v2: HoodieInternalV2Table => v2.catalogTable
case _ => None
}
}

View File

@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.exception.HoodieException import org.apache.hudi.exception.HoodieException
import org.apache.hudi.sql.InsertMode import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChan
import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hudi.analysis.HoodieV1OrV2Table
import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.types.{StructField, StructType}
@@ -105,12 +106,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
case _ => case _ =>
catalogTable0 catalogTable0
} }
HoodieInternalV2Table(
val v2Table = HoodieInternalV2Table(
spark = spark, spark = spark,
path = catalogTable.location.toString, path = catalogTable.location.toString,
catalogTable = Some(catalogTable), catalogTable = Some(catalogTable),
tableIdentifier = Some(ident.toString)) tableIdentifier = Some(ident.toString))
case o => o
val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
// NOTE: PLEASE READ CAREFULLY
//
// Since Hudi relations don't currently implement DS V2 Read API, we by default fallback to V1 here.
// Such fallback will have considerable performance impact, therefore it's only performed in cases
// where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
//
// Check out HUDI-4178 for more details
if (schemaEvolutionEnabled) {
v2Table
} else {
v2Table.v1TableWrapper
}
case t => t
} }
} }
@@ -132,7 +151,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
override def dropTable(ident: Identifier): Boolean = { override def dropTable(ident: Identifier): Boolean = {
val table = loadTable(ident) val table = loadTable(ident)
table match { table match {
case _: HoodieInternalV2Table => case HoodieV1OrV2Table(_) =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark) DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark)
true true
case _ => super.dropTable(ident) case _ => super.dropTable(ident)
@@ -142,7 +161,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
override def purgeTable(ident: Identifier): Boolean = { override def purgeTable(ident: Identifier): Boolean = {
val table = loadTable(ident) val table = loadTable(ident)
table match { table match {
case _: HoodieInternalV2Table => case HoodieV1OrV2Table(_) =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark) DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark)
true true
case _ => super.purgeTable(ident) case _ => super.purgeTable(ident)
@@ -153,24 +172,17 @@ class HoodieCatalog extends DelegatingCatalogExtension
@throws[TableAlreadyExistsException] @throws[TableAlreadyExistsException]
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
loadTable(oldIdent) match { loadTable(oldIdent) match {
case _: HoodieInternalV2Table => case HoodieV1OrV2Table(_) =>
AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark) AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
case _ => super.renameTable(oldIdent, newIdent) case _ => super.renameTable(oldIdent, newIdent)
} }
} }
override def alterTable(ident: Identifier, changes: TableChange*): Table = { override def alterTable(ident: Identifier, changes: TableChange*): Table = {
loadTable(ident) match {
case HoodieV1OrV2Table(table) => {
val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption) val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
// scalastyle:off changes.groupBy(c => c.getClass).foreach {
val table = loadTable(ident) match {
case hoodieTable: HoodieInternalV2Table => hoodieTable
case _ => return super.alterTable(ident, changes: _*)
}
// scalastyle:on
val grouped = changes.groupBy(c => c.getClass)
grouped.foreach {
case (t, newColumns) if t == classOf[AddColumn] => case (t, newColumns) if t == classOf[AddColumn] =>
AlterHoodieTableAddColumnsCommand( AlterHoodieTableAddColumnsCommand(
tableIdent, tableIdent,
@@ -180,6 +192,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
col.dataType(), col.dataType(),
col.isNullable) col.isNullable)
}).run(spark) }).run(spark)
case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) => case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
columnChanges.foreach { columnChanges.foreach {
case dataType: UpdateColumnType => case dataType: UpdateColumnType =>
@@ -190,11 +203,11 @@ class HoodieCatalog extends DelegatingCatalogExtension
case dataType: UpdateColumnComment => case dataType: UpdateColumnComment =>
val newComment = dataType.newComment() val newComment = dataType.newComment()
val colName = UnresolvedAttribute(dataType.fieldNames()).name val colName = UnresolvedAttribute(dataType.fieldNames()).name
val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true, val fieldOpt = table.schema.findNestedField(dataType.fieldNames(), includeCollections = true,
spark.sessionState.conf.resolver).map(_._2) spark.sessionState.conf.resolver).map(_._2)
val field = fieldOpt.getOrElse { val field = fieldOpt.getOrElse {
throw new AnalysisException( throw new AnalysisException(
s"Couldn't find column $colName in:\n${table.schema().treeString}") s"Couldn't find column $colName in:\n${table.schema.treeString}")
} }
AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark) AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark)
} }
@@ -204,6 +217,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
loadTable(ident) loadTable(ident)
} }
case _ => super.alterTable(ident, changes: _*)
}
}
private def deduceTableLocationURIAndTableType( private def deduceTableLocationURIAndTableType(
ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = { ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = {

View File

@@ -21,7 +21,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -74,6 +74,8 @@ case class HoodieInternalV2Table(spark: SparkSession,
override def v1Table: CatalogTable = hoodieCatalogTable.table override def v1Table: CatalogTable = hoodieCatalogTable.table
def v1TableWrapper: V1Table = V1Table(v1Table)
override def partitioning(): Array[Transform] = { override def partitioning(): Array[Transform] = {
hoodieCatalogTable.partitionFields.map { col => hoodieCatalogTable.partitionFields.map { col =>
new IdentityTransform(new FieldReference(Seq(col))) new IdentityTransform(new FieldReference(Seq(col)))