1
0

[HUDI-3506] Add call procedure for CommitsCommand (#5974)

* [HUDI-3506] Add call procedure for CommitsCommand

Co-authored-by: superche <superche@tencent.com>
This commit is contained in:
superche
2022-06-28 09:43:36 +08:00
committed by GitHub
parent 8846849a03
commit b14ed47f21
7 changed files with 941 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier
import scala.collection.JavaConverters._
class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("compare_detail", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val path = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val source = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val target = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(path).build
val sourceTimeline = source.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val targetTimeline = target.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val targetLatestCommit =
if (targetTimeline.getInstants.iterator.hasNext) targetTimeline.lastInstant.get.getTimestamp
else "0"
val sourceLatestCommit =
if (sourceTimeline.getInstants.iterator.hasNext) sourceTimeline.lastInstant.get.getTimestamp
else "0"
if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { // source is behind the target
val commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp)
Seq(Row("Source " + source.getTableConfig.getTableName + " is behind by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup))
} else {
val commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp)
Seq(Row("Source " + source.getTableConfig.getTableName + " is ahead by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup))
}
}
override def build: Procedure = new CommitsCompareProcedure()
}
object CommitsCompareProcedure {
val NAME = "commits_compare"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new CommitsCompareProcedure()
}
}

View File

@@ -43,6 +43,12 @@ object HoodieProcedures {
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
mapBuilder.put(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
mapBuilder.put(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
mapBuilder.put(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
mapBuilder.put(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
mapBuilder.put(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
mapBuilder.put(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)

View File

@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieDefaultTimeline, HoodieInstant}
import org.apache.hudi.common.util.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.time.ZonedDateTime
import java.util
import java.util.function.Supplier
import java.util.{Collections, Date}
import scala.collection.JavaConverters._
class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_partitions_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_update_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty)
))
private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = if (includeExtraMetadata) METADATA_OUTPUT_TYPE else OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
var startTs = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
var endTs = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
// start time for commits, default: now - 10 days
// end time for commits, default: now - 1 day
if (StringUtils.isNullOrEmpty(startTs)) startTs = getTimeDaysAgo(10)
if (StringUtils.isNullOrEmpty(endTs)) endTs = getTimeDaysAgo(1)
val archivedTimeline = metaClient.getArchivedTimeline
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs)
val timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs)
if (includeExtraMetadata) {
getCommitsWithMetadata(timelineRange, limit)
} else {
getCommits(timelineRange, limit)
}
} finally {
// clear the instant details from memory after printing to reduce usage
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs)
}
}
override def build: Procedure = new ShowArchivedCommitsProcedure(includeExtraMetadata)
private def getCommitsWithMetadata(timeline: HoodieDefaultTimeline,
limit: Int): Seq[Row] = {
import scala.collection.JavaConversions._
val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline)
for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
for (hoodieWriteStat <- partitionWriteStat.getValue) {
rows.add(Row(
commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
hoodieWriteStat.getTotalRollbackBlocks, hoodieWriteStat.getTotalLogRecords,
hoodieWriteStat.getTotalUpdatedRecordsCompacted, hoodieWriteStat.getTotalWriteBytes))
}
}
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
private def getSortCommits(timeline: HoodieDefaultTimeline): (util.ArrayList[Row], util.ArrayList[HoodieInstant]) = {
val rows = new util.ArrayList[Row]
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
val commits: util.List[HoodieInstant] = timeline.getCommitsTimeline.filterCompletedInstants
.getInstants.toArray().map(instant => instant.asInstanceOf[HoodieInstant]).toList.asJava
val newCommits = new util.ArrayList[HoodieInstant](commits)
Collections.sort(newCommits, HoodieInstant.COMPARATOR.reversed)
(rows, newCommits)
}
def getCommits(timeline: HoodieDefaultTimeline,
limit: Int): Seq[Row] = {
val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline)
for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
rows.add(Row(commit.getTimestamp, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
commitMetadata.fetchTotalWriteErrors))
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
def getTimeDaysAgo(numberOfDays: Int): String = {
val date = Date.from(ZonedDateTime.now.minusDays(numberOfDays).toInstant)
HoodieActiveTimeline.formatDate(date)
}
}
object ShowArchivedCommitsProcedure {
val NAME = "show_archived_commits"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowArchivedCommitsProcedure(false)
}
}
object ShowArchivedCommitsMetadataProcedure {
val NAME = "show_archived_commits_metadata"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowArchivedCommitsProcedure(true)
}
}

View File

@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.List
import java.util.function.Supplier
import scala.collection.JavaConversions._
class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_tecords_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("file_size", DataTypes.LongType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline = metaClient.getActiveTimeline
val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
val hoodieInstantOption = getCommitForInstant(timeline, instantTime)
val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)
if (commitMetadataOptional.isEmpty) {
throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
}
val meta = commitMetadataOptional.get
val rows = new util.ArrayList[Row]
for (entry <- meta.getPartitionToWriteStats.entrySet) {
val action: String = hoodieInstantOption.get.getAction
val path: String = entry.getKey
val stats: List[HoodieWriteStat] = entry.getValue
for (stat <- stats) {
rows.add(Row(action, path, stat.getFileId, stat.getPrevCommit, stat.getNumUpdateWrites,
stat.getNumWrites, stat.getTotalWriteBytes, stat.getTotalWriteErrors, stat.getFileSizeInBytes))
}
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ShowCommitFilesProcedure()
private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
val instants: util.List[HoodieInstant] = util.Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))
val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
hoodieInstant
}
private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
if (hoodieInstant.isDefined) {
if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieReplaceCommitMetadata]))
} else {
Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieCommitMetadata]))
}
} else {
Option.empty
}
}
}
object ShowCommitFilesProcedure {
val NAME = "show_commit_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowCommitFilesProcedure()
}
}

View File

@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.List
import java.util.function.Supplier
import scala.collection.JavaConversions._
class ShowCommitPartitionsProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_records_inserted", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline = metaClient.getActiveTimeline
val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
val hoodieInstantOption = getCommitForInstant(timeline, instantTime)
val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)
if (commitMetadataOptional.isEmpty) {
throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
}
val meta = commitMetadataOptional.get
val rows = new util.ArrayList[Row]
for (entry <- meta.getPartitionToWriteStats.entrySet) {
val action: String = hoodieInstantOption.get.getAction
val path: String = entry.getKey
val stats: List[HoodieWriteStat] = entry.getValue
var totalFilesAdded: Long = 0
var totalFilesUpdated: Long = 0
var totalRecordsUpdated: Long = 0
var totalRecordsInserted: Long = 0
var totalBytesWritten: Long = 0
var totalWriteErrors: Long = 0
for (stat <- stats) {
if (stat.getPrevCommit == HoodieWriteStat.NULL_COMMIT) {
totalFilesAdded += 1
}
else {
totalFilesUpdated += 1
totalRecordsUpdated += stat.getNumUpdateWrites
}
totalRecordsInserted += stat.getNumInserts
totalBytesWritten += stat.getTotalWriteBytes
totalWriteErrors += stat.getTotalWriteErrors
}
rows.add(Row(action, path, totalFilesAdded, totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated,
totalBytesWritten, totalWriteErrors))
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ShowCommitPartitionsProcedure()
private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
val instants: util.List[HoodieInstant] = util.Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))
val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
hoodieInstant
}
private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
if (hoodieInstant.isDefined) {
if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieReplaceCommitMetadata]))
} else {
Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieCommitMetadata]))
}
} else {
Option.empty
}
}
}
object ShowCommitPartitionsProcedure {
val NAME = "show_commit_partitions"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowCommitPartitionsProcedure()
}
}

View File

@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.function.Supplier
import scala.collection.JavaConversions._
class ShowCommitWriteStatsProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("avg_record_size", DataTypes.LongType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline = metaClient.getActiveTimeline
val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
val hoodieInstantOption = getCommitForInstant(timeline, instantTime)
val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)
if (commitMetadataOptional.isEmpty) {
throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
}
val meta = commitMetadataOptional.get
val action: String = hoodieInstantOption.get.getAction
val recordsWritten = meta.fetchTotalRecordsWritten
val bytesWritten = meta.fetchTotalBytesWritten
val avgRecSize = Math.ceil((1.0 * bytesWritten) / recordsWritten).toLong
val rows = new util.ArrayList[Row]
rows.add(Row(action, bytesWritten, recordsWritten, avgRecSize))
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ShowCommitWriteStatsProcedure()
private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
val instants: util.List[HoodieInstant] = util.Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))
val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
hoodieInstant
}
private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
if (hoodieInstant.isDefined) {
if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieReplaceCommitMetadata]))
} else {
Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieCommitMetadata]))
}
} else {
Option.empty
}
}
}
object ShowCommitWriteStatsProcedure {
val NAME = "show_commit_write_stats"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowCommitWriteStatsProcedure()
}
}

View File

@@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestCommitsProcedure extends HoodieSparkSqlTestBase {
test("Test Call show_archived_commits Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.keep.max.commits = 3,
| hoodie.keep.min.commits = 2,
| hoodie.cleaner.commits.retained = 1
| )
""".stripMargin)
// insert data to table, will generate 3 active commits and 4 archived commits
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
// Check required fields
checkExceptionContain(s"""call show_archived_commits(limit => 10)""")(
s"Argument: table is required")
// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(3){commits.length}
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', endTs => '$endTs')""").collect()
assertResult(4) {
archivedCommits.length
}
}
}
test("Test Call show_archived_commits_metadata Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.keep.max.commits = 3,
| hoodie.keep.min.commits = 2,
| hoodie.cleaner.commits.retained = 1
| )
""".stripMargin)
// insert data to table, will generate 3 active commits and 4 archived commits
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000")
spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500")
spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000")
// Check required fields
checkExceptionContain(s"""call show_archived_commits_metadata(limit => 10)""")(
s"Argument: table is required")
// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(3){commits.length}
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', endTs => '$endTs')""").collect()
assertResult(4) {
archivedCommits.length
}
}
}
test("Test Call show_commit_files Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// Check required fields
checkExceptionContain(s"""call show_commit_files(table => '$tableName')""")(
s"Argument: instant_time is required")
// collect commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(2){commits.length}
// collect commit files for table
val instant_time = commits(0).get(0).toString
val commitFiles = spark.sql(s"""call show_commit_files(table => '$tableName', instant_time => '$instant_time')""").collect()
assertResult(1){commitFiles.length}
}
}
test("Test Call show_commit_partitions Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// Check required fields
checkExceptionContain(s"""call show_commit_partitions(table => '$tableName')""")(
s"Argument: instant_time is required")
// collect commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(2){commits.length}
// collect commit partitions files for table
val instant_time = commits(0).get(0).toString
val commitPartitions = spark.sql(s"""call show_commit_partitions(table => '$tableName', instant_time => '$instant_time')""").collect()
assertResult(1){commitPartitions.length}
}
}
test("Test Call show_commit_write_stats Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// Check required fields
checkExceptionContain(s"""call show_commit_write_stats(table => '$tableName')""")(
s"Argument: instant_time is required")
// collect commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(2){commits.length}
// collect commit write stats for table
val instant_time = commits(0).get(0).toString
val commitPartitions = spark.sql(s"""call show_commit_write_stats(table => '$tableName', instant_time => '$instant_time')""").collect()
assertResult(1){commitPartitions.length}
}
}
test("Test Call commits_compare Procedure") {
withTempDir { tmp =>
val tableName1 = generateTableName
val tableName2 = generateTableName
// create table1
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName1'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table1
spark.sql(s"insert into $tableName1 select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName1 select 2, 'a2', 20, 1500")
// create table2
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName2'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table2
spark.sql(s"insert into $tableName2 select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName2 select 4, 'a4', 40, 2500")
// Check required fields
checkExceptionContain(s"""call commits_compare(table => '$tableName1')""")(
s"Argument: path is required")
// collect commits for table1
var commits1 = spark.sql(s"""call show_commits(table => '$tableName1', limit => 10)""").collect()
assertResult(2){commits1.length}
// collect commits for table2
var commits2 = spark.sql(s"""call show_commits(table => '$tableName2', limit => 10)""").collect()
assertResult(2){commits2.length}
// collect commits compare for table1 and table2
val result = spark.sql(s"""call commits_compare(table => '$tableName1', path => '${tmp.getCanonicalPath}/$tableName2')""").collect()
assertResult(1){result.length}
}
}
}