diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index e6067ed24..9ccb8982b 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -158,6 +158,23 @@ org.jacoco jacoco-maven-plugin + + org.antlr + antlr4-maven-plugin + ${antlr.version} + + + + antlr4 + + + + + true + true + ../hudi-spark/src/main/antlr4/ + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 new file mode 100644 index 000000000..74f83438f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +grammar HoodieSqlCommon; + + singleStatement + : statement EOF + ; + +statement + : compactionStatement #compactionCommand + | .*? #passThrough + ; + + compactionStatement + : operation = (RUN | SCHEDULE) COMPACTION ON tableIdentifier (AT instantTimestamp = NUMBER)? #compactionOnTable + | operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = NUMBER)? #compactionOnPath + | SHOW COMPACTION ON tableIdentifier (LIMIT limit = NUMBER)? #showCompactionOnTable + | SHOW COMPACTION ON path = STRING (LIMIT limit = NUMBER)? #showCompactionOnPath + ; + + tableIdentifier + : (db=IDENTIFIER '.')? table=IDENTIFIER + ; + + ALL: 'ALL'; + AT: 'AT'; + COMPACTION: 'COMPACTION'; + RUN: 'RUN'; + SCHEDULE: 'SCHEDULE'; + ON: 'ON'; + SHOW: 'SHOW'; + LIMIT: 'LIMIT'; + + NUMBER + : DIGIT+ + ; + + IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + + + + fragment DIGIT + : [0-9] + ; + + fragment LETTER + : [A-Z] + ; + + SIMPLE_COMMENT + : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN) + ; + + BRACKETED_COMMENT + : '/*' .*? '*/' -> channel(HIDDEN) + ; + + WS : [ \r\n\t]+ -> channel(HIDDEN) + ; + + // Catch-all for anything we can't recognize. + // We use this to be able to ignore and recover all the text + // when splitting statements with DelimiterLexer + UNRECOGNIZED + : . + ; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala new file mode 100644 index 000000000..8d4ef7a01 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation + +case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long]) + extends Command { + override def children: Seq[LogicalPlan] = Seq(table) +} + +case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long]) + extends Command + +case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20) + extends Command { + override def children: Seq[LogicalPlan] = Seq(table) +} + +case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command + +object CompactionOperation extends Enumeration { + type CompactionOperation = Value + val SCHEDULE, RUN = Value +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala index fcfdfa66f..a18a17f44 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.hudi.analysis.HoodieAnalysis +import org.apache.spark.sql.parser.HoodieCommonSqlParser /** * The Hoodie SparkSessionExtension for extending the syntax and add the rules. @@ -27,11 +28,9 @@ import org.apache.spark.sql.hudi.analysis.HoodieAnalysis class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) with SparkAdapterSupport{ override def apply(extensions: SparkSessionExtensions): Unit = { - // For spark2, we add a extended sql parser - if (sparkAdapter.createExtendedSparkParser.isDefined) { - extensions.injectParser { (session, parser) => - sparkAdapter.createExtendedSparkParser.get(session, parser) - } + + extensions.injectParser { (session, parser) => + new HoodieCommonSqlParser(session, parser) } HoodieAnalysis.customResolutionRules().foreach { rule => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 4dc198c81..99ae226c8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -57,6 +57,13 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } + def getTableIdentify(table: LogicalPlan): TableIdentifier = { + table match { + case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name) + case _ => throw new IllegalArgumentException(s"Illegal table: $table") + } + } + private def tripAlias(plan: LogicalPlan): LogicalPlan = { plan match { case SubqueryAlias(_, relation: LogicalPlan) => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 4c222e745..8ab8e8b0c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -28,13 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand} +import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CompactionHoodiePathCommand, CompactionHoodieTableCommand, CompactionShowHoodiePathCommand, CompactionShowHoodieTableCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand} import org.apache.spark.sql.types.StringType object HoodieAnalysis { @@ -88,6 +88,24 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] if query.resolved && isHoodieTable(table) => CreateHoodieTableAsSelectCommand(table, mode, query) + // Convert to CompactionHoodieTableCommand + case CompactionTable(table, operation, options) + if table.resolved && isHoodieTable(table, sparkSession) => + val tableId = getTableIdentify(table) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + CompactionHoodieTableCommand(catalogTable, operation, options) + // Convert to CompactionHoodiePathCommand + case CompactionPath(path, operation, options) => + CompactionHoodiePathCommand(path, operation, options) + // Convert to CompactionShowOnTable + case CompactionShowOnTable(table, limit) + if isHoodieTable(table, sparkSession) => + val tableId = getTableIdentify(table) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + CompactionShowHoodieTableCommand(catalogTable, limit) + // Convert to CompactionShowHoodiePathCommand + case CompactionShowOnPath(path, limit) => + CompactionShowHoodiePathCommand(path, limit) case _=> plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala new file mode 100644 index 000000000..c8c772ab1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.types.StringType + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +case class CompactionHoodiePathCommand(path: String, + operation: CompactionOperation, instantTimestamp: Option[Long] = None) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + + assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, + s"Must compaction on a Merge On Read table.") + val schemaUtil = new TableSchemaResolver(metaClient) + val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString + + val parameters = HoodieWriterUtils.parametersWithWriteDefaults( + HoodieSqlUtils.withSparkConf(sparkSession, Map.empty)( + Map( + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key() -> HoodieTableType.MERGE_ON_READ.name() + ) + ) + ) + val jsc = new JavaSparkContext(sparkSession.sparkContext) + val client = DataSourceUtils.createHoodieClient(jsc, schemaStr, path, + metaClient.getTableConfig.getTableName, parameters) + + operation match { + case SCHEDULE => + val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) + if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { + Seq(Row(instantTime)) + } else { + Seq(Row(null)) + } + case RUN => + // Do compaction + val timeLine = metaClient.getActiveTimeline + val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .map(_.getTimestamp) + .toSeq.sortBy(f => f) + val willCompactionInstants = if (instantTimestamp.isEmpty) { + if (pendingCompactionInstants.nonEmpty) { + pendingCompactionInstants + } else { // If there are no pending compaction, schedule to generate one. + // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. + Seq(CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE) + .run(sparkSession).take(1).get(0).getString(0)).filter(_ != null) + } + } else { + // Check if the compaction timestamp has exists in the pending compaction + if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) { + Seq(instantTimestamp.get.toString) + } else { + throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in $path," + + s" Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") + } + } + if (willCompactionInstants.isEmpty) { + logInfo(s"No need to compaction on $path") + Seq.empty[Row] + } else { + logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $path") + val timer = new HoodieTimer + timer.startTimer() + willCompactionInstants.foreach {compactionInstant => + val writeResponse = client.compact(compactionInstant) + handlerResponse(writeResponse) + client.commitCompaction(compactionInstant, writeResponse, HOption.empty()) + } + logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + + s" spend: ${timer.endTimer()}ms") + Seq.empty[Row] + } + case _=> throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") + } + } + + private def handlerResponse(writeResponse: JavaRDD[WriteStatus]): Unit = { + // Handle error + val error = writeResponse.rdd.filter(f => f.hasErrors).take(1).headOption + if (error.isDefined) { + if (error.get.hasGlobalError) { + throw error.get.getGlobalError + } else if (!error.get.getErrors.isEmpty) { + val key = error.get.getErrors.asScala.head._1 + val exception = error.get.getErrors.asScala.head._2 + throw new HoodieException(s"Error in write record: $key", exception) + } + } + } + + override val output: Seq[Attribute] = { + operation match { + case RUN => Seq.empty + case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)()) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala new file mode 100644 index 000000000..5fdfefb2d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.types.StringType + +case class CompactionHoodieTableCommand(table: CatalogTable, + operation: CompactionOperation, instantTimestamp: Option[Long]) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val basePath = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession) + } + + override val output: Seq[Attribute] = { + operation match { + case RUN => Seq.empty + case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)()) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala new file mode 100644 index 000000000..ca7891c5d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.util.CompactionUtils +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.types.{IntegerType, StringType} + +import scala.collection.JavaConverters.asScalaIteratorConverter + +case class CompactionShowHoodiePathCommand(path: String, limit: Int) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + + assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, + s"Cannot show compaction on a Non Merge On Read table.") + val timeLine = metaClient.getActiveTimeline + val compactionInstants = timeLine.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .toSeq + .sortBy(f => f.getTimestamp) + .reverse + .take(limit) + val compactionPlans = compactionInstants.map(instant => + (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))) + compactionPlans.map { case (instant, plan) => + Row(instant.getTimestamp, instant.getAction, plan.getOperations.size()) + } + } + + override val output: Seq[Attribute] = { + Seq( + AttributeReference("timestamp", StringType, nullable = false)(), + AttributeReference("action", StringType, nullable = false)(), + AttributeReference("size", IntegerType, nullable = false)() + ) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala new file mode 100644 index 000000000..e06bc1f1f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.types.{IntegerType, StringType} + +case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val basePath = getTableLocation(table, sparkSession) + .getOrElse(s"missing location for ${table.identifier}") + CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession) + } + + override val output: Seq[Attribute] = { + Seq( + AttributeReference("timestamp", StringType, nullable = false)(), + AttributeReference("action", StringType, nullable = false)(), + AttributeReference("size", IntegerType, nullable = false)() + ) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 6c88d89b8..cfebe1e56 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -34,10 +34,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab private val table = deleteTable.table - private val tableId = table match { - case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name) - case _ => throw new IllegalArgumentException(s"Illegal table: $table") - } + private val tableId = getTableIdentify(table) override def run(sparkSession: SparkSession): Seq[Row] = { logInfo(s"start execute delete command for $tableId") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index f6d119525..a11f21f07 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -38,10 +38,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo with SparkAdapterSupport { private val table = updateTable.table - private val tableId = table match { - case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name) - case _ => throw new IllegalArgumentException(s"Illegal table: $table") - } + private val tableId = getTableIdentify(table) override def run(sparkSession: SparkSession): Seq[Row] = { logInfo(s"start execute update command for $tableId") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala new file mode 100644 index 000000000..4f410c7d6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import org.antlr.v4.runtime.{CharStream, CharStreams, CodePointCharStream, CommonTokenStream, IntStream} +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.spark.sql.parser.{HoodieSqlCommonLexer, HoodieSqlCommonParser} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.types.{DataType, StructType} + +class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface) + extends ParserInterface with Logging with SparkAdapterSupport { + + private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate) + private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser + .map(_(session, delegate)).getOrElse(delegate) + + override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => + builder.visit(parser.singleStatement()) match { + case plan: LogicalPlan => plan + case _=> sparkExtendedParser.parsePlan(sqlText) + } + } + + override def parseExpression(sqlText: String): Expression = delegate.parseExpression(sqlText) + + override def parseTableIdentifier(sqlText: String): TableIdentifier = + delegate.parseTableIdentifier(sqlText) + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = + delegate.parseFunctionIdentifier(sqlText) + + override def parseTableSchema(sqlText: String): StructType = delegate.parseTableSchema(sqlText) + + override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText) + + def parseRawDataType(sqlText : String) : DataType = { + throw new UnsupportedOperationException(s"Unsupported parseRawDataType method") + } + + def parseMultipartIdentifier(sqlText: String): Seq[String] = { + throw new UnsupportedOperationException(s"Unsupported parseMultipartIdentifier method") + } + + protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = { + logDebug(s"Parsing command: $command") + + val lexer = new HoodieSqlCommonLexer(new UpperCaseCharStream(CharStreams.fromString(command))) + lexer.removeErrorListeners() + lexer.addErrorListener(ParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = new HoodieSqlCommonParser(tokenStream) + parser.removeErrorListeners() + parser.addErrorListener(ParseErrorListener) + + try { + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } + catch { + case e: ParseCancellationException => + // if we fail, parse with LL mode + tokenStream.seek(0) // rewind input stream + parser.reset() + + // Try Again. + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } + catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(command) + case e: AnalysisException => + val position = Origin(e.line, e.startPosition) + throw new ParseException(Option(command), e.message, position, position) + } + } +} + +/** + * Fork from `org.apache.spark.sql.catalyst.parser.UpperCaseCharStream`. + */ +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume + override def getSourceName(): String = wrapped.getSourceName + override def index(): Int = wrapped.index + override def mark(): Int = wrapped.mark + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size + + override def getText(interval: Interval): String = { + // ANTLR 4.7's CodePointCharStream implementations have bugs when + // getText() is called with an empty stream, or intervals where + // the start > end. See + // https://github.com/antlr/antlr4/commit/ac9f7530 for one fix + // that is not yet in a released ANTLR artifact. + if (size() > 0 && (interval.b - interval.a >= 0)) { + wrapped.getText(interval) + } else { + "" + } + } + // scalastyle:off + override def LA(i: Int): Int = { + // scalastyle:on + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala new file mode 100644 index 000000000..b1f5a32fe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parser + +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.spark.sql.parser.{HoodieSqlCommonBaseVisitor, HoodieSqlCommonParser} +import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser.{CompactionOnPathContext, CompactionOnTableContext, ShowCompactionOnPathContext, ShowCompactionOnTableContext, SingleStatementContext, TableIdentifierContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin +import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils} +import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, LogicalPlan} + +class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface) + extends HoodieSqlCommonBaseVisitor[AnyRef] with Logging with SparkAdapterSupport { + + import ParserUtils._ + + override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { + ctx.statement().accept(this).asInstanceOf[LogicalPlan] + } + + override def visitCompactionOnTable(ctx: CompactionOnTableContext): LogicalPlan = withOrigin(ctx) { + val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan] + val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase) + val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None + CompactionTable(table, operation, timestamp) + } + + override def visitCompactionOnPath (ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) { + val path = string(ctx.path) + val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase) + val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None + CompactionPath(path, operation, timestamp) + } + + override def visitShowCompactionOnTable (ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) { + val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan] + if (ctx.limit != null) { + CompactionShowOnTable(table, ctx.limit.getText.toInt) + } else { + CompactionShowOnTable(table) + } + } + + override def visitShowCompactionOnPath(ctx: ShowCompactionOnPathContext): LogicalPlan = withOrigin(ctx) { + val path = string(ctx.path) + if (ctx.limit != null) { + CompactionShowOnPath(path, ctx.limit.getText.toInt) + } else { + CompactionShowOnPath(path) + } + } + + override def visitTableIdentifier(ctx: TableIdentifierContext): LogicalPlan = withOrigin(ctx) { + UnresolvedRelation(TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala new file mode 100644 index 000000000..e40a48421 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestCompactionTable extends TestHoodieSqlBase { + + test("Test compaction table") { + withTempDir {tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + + spark.sql(s"schedule compaction on $tableName") + spark.sql(s"update $tableName set price = 12 where id = 2") + spark.sql(s"schedule compaction on $tableName") + val compactionRows = spark.sql(s"show compaction on $tableName limit 10").collect() + val timestamps = compactionRows.map(_.getString(0)) + assertResult(2)(timestamps.length) + + spark.sql(s"run compaction on $tableName at ${timestamps(1)}") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 10.0, 1000) + ) + assertResult(1)(spark.sql(s"show compaction on $tableName").collect().length) + spark.sql(s"run compaction on $tableName at ${timestamps(0)}") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"show compaction on $tableName").collect().length) + } + } + + test("Test compaction path") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + + spark.sql(s"run compaction on '${tmp.getCanonicalPath}'") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length) + // schedule compaction first + spark.sql(s"update $tableName set price = 12 where id = 1") + spark.sql(s"schedule compaction on '${tmp.getCanonicalPath}'") + + // schedule compaction second + spark.sql(s"update $tableName set price = 12 where id = 2") + spark.sql(s"schedule compaction on '${tmp.getCanonicalPath}'") + + // show compaction + assertResult(2)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length) + // run compaction for all the scheduled compaction + spark.sql(s"run compaction on '${tmp.getCanonicalPath}'") + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 12.0, 1000), + Seq(2, "a2", 12.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length) + + checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")( + s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: " + ) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark2/pom.xml b/hudi-spark-datasource/hudi-spark2/pom.xml index 2e03043af..0e51fc315 100644 --- a/hudi-spark-datasource/hudi-spark2/pom.xml +++ b/hudi-spark-datasource/hudi-spark2/pom.xml @@ -147,7 +147,7 @@ org.antlr antlr4-maven-plugin - 4.7 + ${antlr.version} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index e55fdf2a5..9a3e8e302 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, Logic import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter -import org.apache.spark.sql.hudi.parser.HoodieSqlParser +import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf /** @@ -73,7 +73,7 @@ class Spark2Adapter extends SparkAdapter { override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { Some( - (spark: SparkSession, delegate: ParserInterface) => new HoodieSqlParser(spark, delegate) + (spark: SparkSession, delegate: ParserInterface) => new HoodieSpark2ExtendedSqlParser(spark, delegate) ) } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlAstBuilder.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlAstBuilder.scala rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala index 24a6a7ccc..4e385945c 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlAstBuilder.scala @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._ * Here we only do the parser for the extended sql syntax. e.g MergeInto. For * other sql syntax we use the delegate sql parser which is the SparkSqlParser. */ -class HoodieSqlAstBuilder(conf: SQLConf, delegate: ParserInterface) extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging { +class HoodieSpark2ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterface) extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging { import ParserUtils._ diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlParser.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlParser.scala similarity index 97% rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlParser.scala rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlParser.scala index 054faa87e..ce32ae091 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/hudi/parser/HoodieSpark2ExtendedSqlParser.scala @@ -32,11 +32,11 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SparkSession} -class HoodieSqlParser(session: SparkSession, delegate: ParserInterface) +class HoodieSpark2ExtendedSqlParser(session: SparkSession, delegate: ParserInterface) extends ParserInterface with Logging { private lazy val conf = session.sqlContext.conf - private lazy val builder = new HoodieSqlAstBuilder(conf, delegate) + private lazy val builder = new HoodieSpark2ExtendedSqlAstBuilder(conf, delegate) override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => builder.visit(parser.singleStatement()) match { diff --git a/pom.xml b/pom.xml index 567266934..9f523aa91 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,7 @@ org.apache.hudi. true 2.7.1 + 4.7