1
0

[HUDI-2182] Support Compaction Command For Spark Sql (#3277)

This commit is contained in:
pengzhiwei
2021-08-06 15:12:10 +08:00
committed by GitHub
parent 20feb1a897
commit 3f8ca1a355
20 changed files with 811 additions and 21 deletions

View File

@@ -158,6 +158,23 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>${antlr.version}</version>
<executions>
<execution>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
<configuration>
<visitor>true</visitor>
<listener>true</listener>
<sourceDirectory>../hudi-spark/src/main/antlr4/</sourceDirectory>
</configuration>
</plugin>
</plugins>
</build>

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
grammar HoodieSqlCommon;
singleStatement
: statement EOF
;
statement
: compactionStatement #compactionCommand
| .*? #passThrough
;
compactionStatement
: operation = (RUN | SCHEDULE) COMPACTION ON tableIdentifier (AT instantTimestamp = NUMBER)? #compactionOnTable
| operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = NUMBER)? #compactionOnPath
| SHOW COMPACTION ON tableIdentifier (LIMIT limit = NUMBER)? #showCompactionOnTable
| SHOW COMPACTION ON path = STRING (LIMIT limit = NUMBER)? #showCompactionOnPath
;
tableIdentifier
: (db=IDENTIFIER '.')? table=IDENTIFIER
;
ALL: 'ALL';
AT: 'AT';
COMPACTION: 'COMPACTION';
RUN: 'RUN';
SCHEDULE: 'SCHEDULE';
ON: 'ON';
SHOW: 'SHOW';
LIMIT: 'LIMIT';
NUMBER
: DIGIT+
;
IDENTIFIER
: (LETTER | DIGIT | '_')+
;
STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
;
fragment DIGIT
: [0-9]
;
fragment LETTER
: [A-Z]
;
SIMPLE_COMMENT
: '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
;
BRACKETED_COMMENT
: '/*' .*? '*/' -> channel(HIDDEN)
;
WS : [ \r\n\t]+ -> channel(HIDDEN)
;
// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
UNRECOGNIZED
: .
;

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
}
case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command
case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
}
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command
object CompactionOperation extends Enumeration {
type CompactionOperation = Value
val SCHEDULE, RUN = Value
}

View File

@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis
import org.apache.spark.sql.parser.HoodieCommonSqlParser
/**
* The Hoodie SparkSessionExtension for extending the syntax and add the rules.
@@ -27,11 +28,9 @@ import org.apache.spark.sql.hudi.analysis.HoodieAnalysis
class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
with SparkAdapterSupport{
override def apply(extensions: SparkSessionExtensions): Unit = {
// For spark2, we add a extended sql parser
if (sparkAdapter.createExtendedSparkParser.isDefined) {
extensions.injectParser { (session, parser) =>
sparkAdapter.createExtendedSparkParser.get(session, parser)
}
extensions.injectParser { (session, parser) =>
new HoodieCommonSqlParser(session, parser)
}
HoodieAnalysis.customResolutionRules().foreach { rule =>

View File

@@ -57,6 +57,13 @@ object HoodieSqlUtils extends SparkAdapterSupport {
}
}
def getTableIdentify(table: LogicalPlan): TableIdentifier = {
table match {
case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name)
case _ => throw new IllegalArgumentException(s"Illegal table: $table")
}
}
private def tripAlias(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>

View File

@@ -28,13 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CompactionHoodiePathCommand, CompactionHoodieTableCommand, CompactionShowHoodiePathCommand, CompactionShowHoodieTableCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.types.StringType
object HoodieAnalysis {
@@ -88,6 +88,24 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
if query.resolved && isHoodieTable(table) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
// Convert to CompactionHoodieTableCommand
case CompactionTable(table, operation, options)
if table.resolved && isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentify(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionHoodieTableCommand(catalogTable, operation, options)
// Convert to CompactionHoodiePathCommand
case CompactionPath(path, operation, options) =>
CompactionHoodiePathCommand(path, operation, options)
// Convert to CompactionShowOnTable
case CompactionShowOnTable(table, limit)
if isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentify(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
CompactionShowHoodieTableCommand(catalogTable, limit)
// Convert to CompactionShowHoodiePathCommand
case CompactionShowOnPath(path, limit) =>
CompactionShowHoodiePathCommand(path, limit)
case _=> plan
}
}

View File

@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.types.StringType
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
case class CompactionHoodiePathCommand(path: String,
operation: CompactionOperation, instantTimestamp: Option[Long] = None)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Must compaction on a Merge On Read table.")
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(
HoodieSqlUtils.withSparkConf(sparkSession, Map.empty)(
Map(
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key() -> HoodieTableType.MERGE_ON_READ.name()
)
)
)
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schemaStr, path,
metaClient.getTableConfig.getTableName, parameters)
operation match {
case SCHEDULE =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(Row(instantTime))
} else {
Seq(Row(null))
}
case RUN =>
// Do compaction
val timeLine = metaClient.getActiveTimeline
val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
val willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
Seq(CompactionHoodiePathCommand(path, CompactionOperation.SCHEDULE)
.run(sparkSession).take(1).get(0).getString(0)).filter(_ != null)
}
} else {
// Check if the compaction timestamp has exists in the pending compaction
if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
Seq(instantTimestamp.get.toString)
} else {
throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in $path," +
s" Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}
if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $path")
Seq.empty[Row]
} else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $path")
val timer = new HoodieTimer
timer.startTimer()
willCompactionInstants.foreach {compactionInstant =>
val writeResponse = client.compact(compactionInstant)
handlerResponse(writeResponse)
client.commitCompaction(compactionInstant, writeResponse, HOption.empty())
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
Seq.empty[Row]
}
case _=> throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}
}
private def handlerResponse(writeResponse: JavaRDD[WriteStatus]): Unit = {
// Handle error
val error = writeResponse.rdd.filter(f => f.hasErrors).take(1).headOption
if (error.isDefined) {
if (error.get.hasGlobalError) {
throw error.get.getGlobalError
} else if (!error.get.getErrors.isEmpty) {
val key = error.get.getErrors.asScala.head._1
val exception = error.get.getErrors.asScala.head._2
throw new HoodieException(s"Error in write record: $key", exception)
}
}
}
override val output: Seq[Attribute] = {
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
}

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.StringType
case class CompactionHoodieTableCommand(table: CatalogTable,
operation: CompactionOperation, instantTimestamp: Option[Long])
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
}
override val output: Seq[Attribute] = {
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
}

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.CompactionUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types.{IntegerType, StringType}
import scala.collection.JavaConverters.asScalaIteratorConverter
case class CompactionShowHoodiePathCommand(path: String, limit: Int)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString)
.setConf(sparkSession.sessionState.newHadoopConf()).build()
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val timeLine = metaClient.getActiveTimeline
val compactionInstants = timeLine.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)
val compactionPlans = compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)))
compactionPlans.map { case (instant, plan) =>
Row(instant.getTimestamp, instant.getAction, plan.getOperations.size())
}
}
override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
import org.apache.spark.sql.types.{IntegerType, StringType}
case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val basePath = getTableLocation(table, sparkSession)
.getOrElse(s"missing location for ${table.identifier}")
CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
}
override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
}

View File

@@ -34,10 +34,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
private val table = deleteTable.table
private val tableId = table match {
case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name)
case _ => throw new IllegalArgumentException(s"Illegal table: $table")
}
private val tableId = getTableIdentify(table)
override def run(sparkSession: SparkSession): Seq[Row] = {
logInfo(s"start execute delete command for $tableId")

View File

@@ -38,10 +38,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
with SparkAdapterSupport {
private val table = updateTable.table
private val tableId = table match {
case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name)
case _ => throw new IllegalArgumentException(s"Illegal table: $table")
}
private val tableId = getTableIdentify(table)
override def run(sparkSession: SparkSession): Seq[Row] = {
logInfo(s"start execute update command for $tableId")

View File

@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import org.antlr.v4.runtime.{CharStream, CharStreams, CodePointCharStream, CommonTokenStream, IntStream}
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.spark.sql.parser.{HoodieSqlCommonLexer, HoodieSqlCommonParser}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.types.{DataType, StructType}
class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface)
extends ParserInterface with Logging with SparkAdapterSupport {
private lazy val builder = new HoodieSqlCommonAstBuilder(session, delegate)
private lazy val sparkExtendedParser = sparkAdapter.createExtendedSparkParser
.map(_(session, delegate)).getOrElse(delegate)
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _=> sparkExtendedParser.parsePlan(sqlText)
}
}
override def parseExpression(sqlText: String): Expression = delegate.parseExpression(sqlText)
override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)
override def parseTableSchema(sqlText: String): StructType = delegate.parseTableSchema(sqlText)
override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)
def parseRawDataType(sqlText : String) : DataType = {
throw new UnsupportedOperationException(s"Unsupported parseRawDataType method")
}
def parseMultipartIdentifier(sqlText: String): Seq[String] = {
throw new UnsupportedOperationException(s"Unsupported parseMultipartIdentifier method")
}
protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = {
logDebug(s"Parsing command: $command")
val lexer = new HoodieSqlCommonLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
val parser = new HoodieSqlCommonParser(tokenStream)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()
// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
}
/**
* Fork from `org.apache.spark.sql.catalyst.parser.UpperCaseCharStream`.
*/
class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
override def consume(): Unit = wrapped.consume
override def getSourceName(): String = wrapped.getSourceName
override def index(): Int = wrapped.index
override def mark(): Int = wrapped.mark
override def release(marker: Int): Unit = wrapped.release(marker)
override def seek(where: Int): Unit = wrapped.seek(where)
override def size(): Int = wrapped.size
override def getText(interval: Interval): String = {
// ANTLR 4.7's CodePointCharStream implementations have bugs when
// getText() is called with an empty stream, or intervals where
// the start > end. See
// https://github.com/antlr/antlr4/commit/ac9f7530 for one fix
// that is not yet in a released ANTLR artifact.
if (size() > 0 && (interval.b - interval.a >= 0)) {
wrapped.getText(interval)
} else {
""
}
}
// scalastyle:off
override def LA(i: Int): Int = {
// scalastyle:on
val la = wrapped.LA(i)
if (la == 0 || la == IntStream.EOF) la
else Character.toUpperCase(la)
}
}

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.spark.sql.parser.{HoodieSqlCommonBaseVisitor, HoodieSqlCommonParser}
import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser.{CompactionOnPathContext, CompactionOnTableContext, ShowCompactionOnPathContext, ShowCompactionOnTableContext, SingleStatementContext, TableIdentifierContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils}
import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, LogicalPlan}
class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface)
extends HoodieSqlCommonBaseVisitor[AnyRef] with Logging with SparkAdapterSupport {
import ParserUtils._
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) {
ctx.statement().accept(this).asInstanceOf[LogicalPlan]
}
override def visitCompactionOnTable(ctx: CompactionOnTableContext): LogicalPlan = withOrigin(ctx) {
val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan]
val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase)
val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None
CompactionTable(table, operation, timestamp)
}
override def visitCompactionOnPath (ctx: CompactionOnPathContext): LogicalPlan = withOrigin(ctx) {
val path = string(ctx.path)
val operation = CompactionOperation.withName(ctx.operation.getText.toUpperCase)
val timestamp = if (ctx.instantTimestamp != null) Some(ctx.instantTimestamp.getText.toLong) else None
CompactionPath(path, operation, timestamp)
}
override def visitShowCompactionOnTable (ctx: ShowCompactionOnTableContext): LogicalPlan = withOrigin(ctx) {
val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan]
if (ctx.limit != null) {
CompactionShowOnTable(table, ctx.limit.getText.toInt)
} else {
CompactionShowOnTable(table)
}
}
override def visitShowCompactionOnPath(ctx: ShowCompactionOnPathContext): LogicalPlan = withOrigin(ctx) {
val path = string(ctx.path)
if (ctx.limit != null) {
CompactionShowOnPath(path, ctx.limit.getText.toInt)
} else {
CompactionShowOnPath(path)
}
}
override def visitTableIdentifier(ctx: TableIdentifierContext): LogicalPlan = withOrigin(ctx) {
UnresolvedRelation(TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText)))
}
}

View File

@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
class TestCompactionTable extends TestHoodieSqlBase {
test("Test compaction table") {
withTempDir {tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"schedule compaction on $tableName")
spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"schedule compaction on $tableName")
val compactionRows = spark.sql(s"show compaction on $tableName limit 10").collect()
val timestamps = compactionRows.map(_.getString(0))
assertResult(2)(timestamps.length)
spark.sql(s"run compaction on $tableName at ${timestamps(1)}")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
assertResult(1)(spark.sql(s"show compaction on $tableName").collect().length)
spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
assertResult(0)(spark.sql(s"show compaction on $tableName").collect().length)
}
}
test("Test compaction path") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"run compaction on '${tmp.getCanonicalPath}'")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
Seq(2, "a2", 10.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
// schedule compaction first
spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"schedule compaction on '${tmp.getCanonicalPath}'")
// schedule compaction second
spark.sql(s"update $tableName set price = 12 where id = 2")
spark.sql(s"schedule compaction on '${tmp.getCanonicalPath}'")
// show compaction
assertResult(2)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
// run compaction for all the scheduled compaction
spark.sql(s"run compaction on '${tmp.getCanonicalPath}'")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 12.0, 1000),
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: "
)
}
}
}

View File

@@ -147,7 +147,7 @@
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>4.7</version>
<version>${antlr.version}</version>
<executions>
<execution>
<goals>

View File

@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, Logic
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSqlParser
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
/**
@@ -73,7 +73,7 @@ class Spark2Adapter extends SparkAdapter {
override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = {
Some(
(spark: SparkSession, delegate: ParserInterface) => new HoodieSqlParser(spark, delegate)
(spark: SparkSession, delegate: ParserInterface) => new HoodieSpark2ExtendedSqlParser(spark, delegate)
)
}

View File

@@ -34,7 +34,7 @@ import scala.collection.JavaConverters._
* Here we only do the parser for the extended sql syntax. e.g MergeInto. For
* other sql syntax we use the delegate sql parser which is the SparkSqlParser.
*/
class HoodieSqlAstBuilder(conf: SQLConf, delegate: ParserInterface) extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging {
class HoodieSpark2ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterface) extends HoodieSqlBaseBaseVisitor[AnyRef] with Logging {
import ParserUtils._

View File

@@ -32,11 +32,11 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SparkSession}
class HoodieSqlParser(session: SparkSession, delegate: ParserInterface)
class HoodieSpark2ExtendedSqlParser(session: SparkSession, delegate: ParserInterface)
extends ParserInterface with Logging {
private lazy val conf = session.sqlContext.conf
private lazy val builder = new HoodieSqlAstBuilder(conf, delegate)
private lazy val builder = new HoodieSpark2ExtendedSqlAstBuilder(conf, delegate)
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {

View File

@@ -152,6 +152,7 @@
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
<shadeSources>true</shadeSources>
<zk-curator.version>2.7.1</zk-curator.version>
<antlr.version>4.7</antlr.version>
</properties>
<scm>