From fe9406dd33c0fe860e924e46f7b2908bf3155265 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Sun, 2 Jan 2022 19:06:55 +0800 Subject: [PATCH] [HUDI-3131] fix ctas error in spark3.1.1 (#4476) --- .../command/CreateHoodieTableAsSelectCommand.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index f1a344fb8..2790ea97c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.hudi.HoodieSqlUtils import scala.collection.JavaConverters._ @@ -39,13 +38,9 @@ import scala.collection.JavaConverters._ case class CreateHoodieTableAsSelectCommand( table: CatalogTable, mode: SaveMode, - query: LogicalPlan) extends DataWritingCommand { + query: LogicalPlan) extends HoodieLeafRunnableCommand { - def withNewChildInternal(newChild: LogicalPlan): CreateHoodieTableAsSelectCommand = { - this - } - - override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { + override def run(sparkSession: SparkSession): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) @@ -118,8 +113,6 @@ case class CreateHoodieTableAsSelectCommand( fs.delete(path, true) } - override def outputColumnNames: Seq[String] = query.output.map(_.name) - private def reOrderPartitionColumn(query: LogicalPlan, partitionColumns: Seq[String]): LogicalPlan = { if (partitionColumns.isEmpty) {