[HUDI-3131] fix ctas error in spark3.1.1 (#4476)
This commit is contained in:
@@ -28,7 +28,6 @@ import org.apache.spark.sql.{Row, SaveMode, SparkSession}
|
|||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
||||||
import org.apache.spark.sql.execution.SparkPlan
|
import org.apache.spark.sql.execution.SparkPlan
|
||||||
import org.apache.spark.sql.execution.command.DataWritingCommand
|
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
import org.apache.spark.sql.hudi.HoodieSqlUtils
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
@@ -39,13 +38,9 @@ import scala.collection.JavaConverters._
|
|||||||
case class CreateHoodieTableAsSelectCommand(
|
case class CreateHoodieTableAsSelectCommand(
|
||||||
table: CatalogTable,
|
table: CatalogTable,
|
||||||
mode: SaveMode,
|
mode: SaveMode,
|
||||||
query: LogicalPlan) extends DataWritingCommand {
|
query: LogicalPlan) extends HoodieLeafRunnableCommand {
|
||||||
|
|
||||||
def withNewChildInternal(newChild: LogicalPlan): CreateHoodieTableAsSelectCommand = {
|
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
|
|
||||||
assert(table.tableType != CatalogTableType.VIEW)
|
assert(table.tableType != CatalogTableType.VIEW)
|
||||||
assert(table.provider.isDefined)
|
assert(table.provider.isDefined)
|
||||||
|
|
||||||
@@ -118,8 +113,6 @@ case class CreateHoodieTableAsSelectCommand(
|
|||||||
fs.delete(path, true)
|
fs.delete(path, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def outputColumnNames: Seq[String] = query.output.map(_.name)
|
|
||||||
|
|
||||||
private def reOrderPartitionColumn(query: LogicalPlan,
|
private def reOrderPartitionColumn(query: LogicalPlan,
|
||||||
partitionColumns: Seq[String]): LogicalPlan = {
|
partitionColumns: Seq[String]): LogicalPlan = {
|
||||||
if (partitionColumns.isEmpty) {
|
if (partitionColumns.isEmpty) {
|
||||||
|
|||||||
Reference in New Issue
Block a user