[MINOR] Minor changes around Spark 3.3 support (#6231)
Co-authored-by: Shawn Chang <yxchang@amazon.com>
This commit is contained in:
@@ -162,7 +162,7 @@ trait SparkAdapter extends Serializable {
|
|||||||
* Extract condition in [[DeleteFromTable]]
|
* Extract condition in [[DeleteFromTable]]
|
||||||
* SPARK-38626 condition is no longer Option in Spark 3.3
|
* SPARK-38626 condition is no longer Option in Spark 3.3
|
||||||
*/
|
*/
|
||||||
def extractCondition(deleteFromTable: Command): Expression
|
def extractDeleteCondition(deleteFromTable: Command): Expression
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
|
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
|
||||||
|
|||||||
@@ -435,7 +435,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
|
|||||||
// Resolve Delete Table
|
// Resolve Delete Table
|
||||||
case dft @ DeleteFromTable(table, condition)
|
case dft @ DeleteFromTable(table, condition)
|
||||||
if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
|
if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
|
||||||
val resolveExpression = resolveExpressionFrom(table, None)_
|
val resolveExpression = resolveExpressionFrom(table, None)(_)
|
||||||
sparkAdapter.resolveDeleteFromTable(dft, resolveExpression)
|
sparkAdapter.resolveDeleteFromTable(dft, resolveExpression)
|
||||||
|
|
||||||
// Append the meta field to the insert query to walk through the validate for the
|
// Append the meta field to the insert query to walk through the validate for the
|
||||||
|
|||||||
@@ -37,8 +37,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie
|
|||||||
|
|
||||||
// Remove meta fields from the data frame
|
// Remove meta fields from the data frame
|
||||||
var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
|
var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
|
||||||
// SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3
|
val condition = sparkAdapter.extractDeleteCondition(deleteTable)
|
||||||
val condition = sparkAdapter.extractCondition(deleteTable)
|
|
||||||
if (condition != null) df = df.filter(Column(condition))
|
if (condition != null) df = df.filter(Column(condition))
|
||||||
|
|
||||||
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
|
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
|
||||||
|
|||||||
@@ -138,7 +138,7 @@ class Spark2Adapter extends SparkAdapter {
|
|||||||
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def extractCondition(deleteFromTable: Command): Expression = {
|
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
|
||||||
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
|
|||||||
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def extractCondition(deleteFromTable: Command): Expression = {
|
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
|
||||||
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
|
|||||||
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def extractCondition(deleteFromTable: Command): Expression = {
|
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
|
||||||
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
deleteFromTable.asInstanceOf[DeleteFromTable].condition.getOrElse(null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
|
|||||||
DeleteFromTable(deleteFromTableCommand.table, resolveExpression(deleteFromTableCommand.condition))
|
DeleteFromTable(deleteFromTableCommand.table, resolveExpression(deleteFromTableCommand.condition))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def extractCondition(deleteFromTable: Command): Expression = {
|
override def extractDeleteCondition(deleteFromTable: Command): Expression = {
|
||||||
deleteFromTable.asInstanceOf[DeleteFromTable].condition
|
deleteFromTable.asInstanceOf[DeleteFromTable].condition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user