1
0

[HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax for Spark SQL (#5761)

* Support Create/Drop/Show/Refresh Index Syntax for Spark SQL
This commit is contained in:
huberylee
2022-06-17 18:33:58 +08:00
committed by GitHub
parent 7689e62cd9
commit fec49dc12b
8 changed files with 709 additions and 2 deletions

View File

@@ -48,6 +48,13 @@
statement
: compactionStatement #compactionCommand
| CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
tableIdentifier (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS indexOptions=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex
| SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes
| REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex
| .*? #passThrough
;
@@ -99,6 +106,14 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;
multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;
multipartIdentifierProperty
: multipartIdentifier (OPTIONS options=propertyList)?
;
multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
@@ -114,9 +129,53 @@
;
nonReserved
: CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT
: CALL
| COMPACTION
| CREATE
| DROP
| EXISTS
| FROM
| IN
| INDEX
| INDEXES
| IF
| LIMIT
| NOT
| ON
| OPTIONS
| REFRESH
| RUN
| SCHEDULE
| SHOW
| TABLE
| USING
;
propertyList
: LEFT_PAREN property (COMMA property)* RIGHT_PAREN
;
property
: key=propertyKey (EQ? value=propertyValue)?
;
propertyKey
: identifier (DOT identifier)*
| STRING
;
propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;
LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';
ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
@@ -132,6 +191,21 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
CREATE: 'CREATE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
IF: 'IF';
NOT: 'NOT';
EXISTS: 'EXISTS';
TABLE: 'TABLE';
USING: 'USING';
OPTIONS: 'OPTIONS';
DROP: 'DROP';
FROM: 'FROM';
IN: 'IN';
REFRESH: 'REFRESH';
EQ: '=' | '==';
PLUS: '+';
MINUS: '-';

View File

@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
/**
* The logical plan of the CREATE INDEX command.
*/
case class CreateIndex(
table: LogicalPlan,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
properties: Map[String, String],
override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved)
def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex = {
copy(table = newChild.head)
}
}
object CreateIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}
/**
* The logical plan of the DROP INDEX command.
*/
case class DropIndex(
table: LogicalPlan,
indexName: String,
ignoreIfNotExists: Boolean,
override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = {
copy(table = newChild.head)
}
}
object DropIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}
/**
* The logical plan of the SHOW INDEXES command.
*/
case class ShowIndexes(
table: LogicalPlan,
override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes = {
copy(table = newChild.head)
}
}
object ShowIndexes {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("index_name", StringType, nullable = false)(),
AttributeReference("col_name", StringType, nullable = false)(),
AttributeReference("index_type", StringType, nullable = false)(),
AttributeReference("col_options", StringType, nullable = true)(),
AttributeReference("options", StringType, nullable = true)()
)
}
/**
* The logical plan of the REFRESH INDEX command.
*/
case class RefreshIndex(
table: LogicalPlan,
indexName: String,
override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex = {
copy(table = newChild.head)
}
}
object RefreshIndex {
def getOutputAttrs: Seq[Attribute] = Seq.empty
}

View File

@@ -171,6 +171,28 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
} else {
c
}
// Convert to CreateIndexCommand
case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties, output)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
CreateIndexCommand(
getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, properties, output)
// Convert to DropIndexCommand
case DropIndex(table, indexName, ignoreIfNotExists, output)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
DropIndexCommand(getTableIdentifier(table), indexName, ignoreIfNotExists, output)
// Convert to ShowIndexesCommand
case ShowIndexes(table, output)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
ShowIndexesCommand(getTableIdentifier(table), output)
// Covert to RefreshCommand
case RefreshIndex(table, indexName, output)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
RefreshIndexCommand(getTableIdentifier(table), indexName, output)
case _ => plan
}
}

View File

@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi.command
import org.apache.hudi.common.index.HoodieIndex
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.{Row, SparkSession}
case class CreateIndexCommand(
tableId: TableIdentifier,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
properties: Map[String, String],
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
// The implementation for different index type
Seq.empty
}
}
case class DropIndexCommand(
tableId: TableIdentifier,
indexName: String,
ignoreIfNotExists: Boolean,
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
// The implementation for different index type
Seq.empty
}
}
case class ShowIndexesCommand(
tableId: TableIdentifier,
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
// The implementation for different index type
Seq.empty
}
}
case class RefreshIndexCommand(
tableId: TableIdentifier,
indexName: String,
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
// The implementation for different index type
Seq.empty
}
}
abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging {
/**
* Check hoodie index exists. In a hoodie table, hoodie index name
* must be unique, so the index name will be checked firstly,
*
* @param secondaryIndexes Current hoodie indexes
* @param indexName The index name to be checked
* @param colNames The column names to be checked
* @return true if the index exists
*/
def indexExists(
secondaryIndexes: Option[Array[HoodieIndex]],
indexName: String,
indexType: Option[String] = None,
colNames: Option[Array[String]] = None): Boolean = {
secondaryIndexes.exists(i => {
i.exists(_.getIndexName.equals(indexName)) ||
// Index type and column name need to be checked if present
indexType.exists(t =>
colNames.exists(c =>
i.exists(index =>
index.getIndexType.name().equalsIgnoreCase(t) && index.getColNames.sameElements(c))))
})
}
}

View File

@@ -25,11 +25,12 @@ import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import java.util.Locale
import scala.collection.JavaConverters._
class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface)
@@ -147,4 +148,144 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface
private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
/**
* Create an index, returning a [[CreateIndex]] logical plan.
* For example:
* {{{
* CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list)
* [OPTIONS indexPropertyList]
* column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ]
* indexPropertyList: index_property_name [= index_property_value] [ , . . . ]
* }}}
*/
override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) {
val (indexName, indexType) = if (ctx.identifier.size() == 1) {
(ctx.identifier(0).getText, "")
} else {
(ctx.identifier(0).getText, ctx.identifier(1).getText)
}
val columns = ctx.columns.multipartIdentifierProperty.asScala
.map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq
val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala
.map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq
val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty)
CreateIndex(
visitTableIdentifier(ctx.tableIdentifier()),
indexName,
indexType,
ctx.EXISTS != null,
columns.map(UnresolvedAttribute(_)).zip(columnsProperties),
options)
}
/**
* Drop an index, returning a [[DropIndex]] logical plan.
* For example:
* {{{
* DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
* }}}
*/
override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) {
val indexName = ctx.identifier.getText
DropIndex(
visitTableIdentifier(ctx.tableIdentifier()),
indexName,
ctx.EXISTS != null)
}
/**
* Show indexes, returning a [[ShowIndexes]] logical plan.
* For example:
* {{{
* SHOW INDEXES (FROM | IN) [TABLE] table_name
* }}}
*/
override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) {
ShowIndexes(visitTableIdentifier(ctx.tableIdentifier()))
}
/**
* Refresh index, returning a [[RefreshIndex]] logical plan
* For example:
* {{{
* REFRESH INDEX index_name ON [TABLE] table_name
* }}}
*/
override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) {
RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()), ctx.identifier.getText)
}
/**
* Convert a property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
override def visitPropertyList(
ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) {
val properties = ctx.property.asScala.map { property =>
val key = visitPropertyKey(property.key)
val value = visitPropertyValue(property.value)
key -> value
}
// Check for duplicate property names.
checkDuplicateKeys(properties.toSeq, ctx)
properties.toMap
}
/**
* Parse a key-value map from a [[PropertyListContext]], assuming all values are specified.
*/
def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
val props = visitPropertyList(ctx)
val badKeys = props.collect { case (key, null) => key }
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
}
props
}
/**
* Parse a list of keys from a [[PropertyListContext]], assuming no values are specified.
*/
def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = {
val props = visitPropertyList(ctx)
val badKeys = props.filter { case (_, v) => v != null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
}
props.keys.toSeq
}
/**
* A property key can either be String or a collection of dot separated elements. This
* function extracts the property key based on whether its a string literal or a property
* identifier.
*/
override def visitPropertyKey(key: PropertyKeyContext): String = {
if (key.STRING != null) {
string(key.STRING)
} else {
key.getText
}
}
/**
* A property value can be String, Integer, Boolean or Decimal. This function extracts
* the property value based on whether its a string, integer, boolean or decimal literal.
*/
override def visitPropertyValue(value: PropertyValueContext): String = {
if (value == null) {
null
} else if (value.STRING != null) {
string(value.STRING)
} else if (value.booleanValue != null) {
value.getText.toLowerCase(Locale.ROOT)
} else {
value.getText
}
}
}

View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi.command.index
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import org.apache.spark.sql.hudi.command.{CreateIndexCommand, DropIndexCommand, ShowIndexesCommand}
class TestIndexSyntax extends HoodieSparkSqlTestBase {
test("Test Create/Drop/Show/Refresh Index") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val sqlParser: ParserInterface = spark.sessionState.sqlParser
val analyzer: Analyzer = spark.sessionState.analyzer
var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName")
var resolvedLogicalPlan = analyzer.execute(logicalPlan)
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].tableId.toString())
logicalPlan = sqlParser.parsePlan(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString())
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].tableId.toString())
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName)
assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists)
}
}
}
}