diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala new file mode 100644 index 000000000..62330cb54 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.collection.JavaConverters._ + +class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "path", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("compare_detail", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val path = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val source = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val target = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(path).build + val sourceTimeline = source.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val targetTimeline = target.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val targetLatestCommit = + if (targetTimeline.getInstants.iterator.hasNext) targetTimeline.lastInstant.get.getTimestamp + else "0" + val sourceLatestCommit = + if (sourceTimeline.getInstants.iterator.hasNext) sourceTimeline.lastInstant.get.getTimestamp + else "0" + + if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { // source is behind the target + val commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp) + Seq(Row("Source " + source.getTableConfig.getTableName + " is behind by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup)) + } else { + val commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp) + Seq(Row("Source " + source.getTableConfig.getTableName + " is ahead by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup)) + } + } + + override def build: Procedure = new CommitsCompareProcedure() +} + +object CommitsCompareProcedure { + val NAME = "commits_compare" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new CommitsCompareProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index e3f05389a..d066ae5bc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -43,6 +43,12 @@ object HoodieProcedures { mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder) mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder) mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) + mapBuilder.put(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder) + mapBuilder.put(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder) + mapBuilder.put(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder) + mapBuilder.put(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder) + mapBuilder.put(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder) + mapBuilder.put(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder) mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder) mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder) mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala new file mode 100644 index 000000000..12bc85f04 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.HoodieCommitMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieDefaultTimeline, HoodieInstant} +import org.apache.hudi.common.util.StringUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.time.ZonedDateTime +import java.util +import java.util.function.Supplier +import java.util.{Collections, Date} +import scala.collection.JavaConverters._ + +class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""), + ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_partitions_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_update_records_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = if (includeExtraMetadata) METADATA_OUTPUT_TYPE else OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + var startTs = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + var endTs = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + // start time for commits, default: now - 10 days + // end time for commits, default: now - 1 day + if (StringUtils.isNullOrEmpty(startTs)) startTs = getTimeDaysAgo(10) + if (StringUtils.isNullOrEmpty(endTs)) endTs = getTimeDaysAgo(1) + + val archivedTimeline = metaClient.getArchivedTimeline + try { + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs) + val timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs) + if (includeExtraMetadata) { + getCommitsWithMetadata(timelineRange, limit) + } else { + getCommits(timelineRange, limit) + } + } finally { + // clear the instant details from memory after printing to reduce usage + archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs) + } + } + + override def build: Procedure = new ShowArchivedCommitsProcedure(includeExtraMetadata) + + private def getCommitsWithMetadata(timeline: HoodieDefaultTimeline, + limit: Int): Seq[Row] = { + import scala.collection.JavaConversions._ + + val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline) + + for (i <- 0 until newCommits.size) { + val commit = newCommits.get(i) + val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata]) + for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) { + for (hoodieWriteStat <- partitionWriteStat.getValue) { + rows.add(Row( + commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath, + hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites, + hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites, + hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock, + hoodieWriteStat.getTotalRollbackBlocks, hoodieWriteStat.getTotalLogRecords, + hoodieWriteStat.getTotalUpdatedRecordsCompacted, hoodieWriteStat.getTotalWriteBytes)) + } + } + } + + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + private def getSortCommits(timeline: HoodieDefaultTimeline): (util.ArrayList[Row], util.ArrayList[HoodieInstant]) = { + val rows = new util.ArrayList[Row] + // timeline can be read from multiple files. So sort is needed instead of reversing the collection + val commits: util.List[HoodieInstant] = timeline.getCommitsTimeline.filterCompletedInstants + .getInstants.toArray().map(instant => instant.asInstanceOf[HoodieInstant]).toList.asJava + val newCommits = new util.ArrayList[HoodieInstant](commits) + Collections.sort(newCommits, HoodieInstant.COMPARATOR.reversed) + (rows, newCommits) + } + + def getCommits(timeline: HoodieDefaultTimeline, + limit: Int): Seq[Row] = { + val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline) + + for (i <- 0 until newCommits.size) { + val commit = newCommits.get(i) + val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata]) + rows.add(Row(commit.getTimestamp, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert, + commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten, + commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten, + commitMetadata.fetchTotalWriteErrors)) + } + + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + def getTimeDaysAgo(numberOfDays: Int): String = { + val date = Date.from(ZonedDateTime.now.minusDays(numberOfDays).toInstant) + HoodieActiveTimeline.formatDate(date) + } +} + +object ShowArchivedCommitsProcedure { + val NAME = "show_archived_commits" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowArchivedCommitsProcedure(false) + } +} + +object ShowArchivedCommitsMetadataProcedure { + val NAME = "show_archived_commits_metadata" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowArchivedCommitsProcedure(true) + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala new file mode 100644 index 000000000..9fea4a18f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.List +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_tecords_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = metaClient.getActiveTimeline + val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants + val hoodieInstantOption = getCommitForInstant(timeline, instantTime) + val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption) + + if (commitMetadataOptional.isEmpty) { + throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.") + } + + val meta = commitMetadataOptional.get + val rows = new util.ArrayList[Row] + for (entry <- meta.getPartitionToWriteStats.entrySet) { + val action: String = hoodieInstantOption.get.getAction + val path: String = entry.getKey + val stats: List[HoodieWriteStat] = entry.getValue + for (stat <- stats) { + rows.add(Row(action, path, stat.getFileId, stat.getPrevCommit, stat.getNumUpdateWrites, + stat.getNumWrites, stat.getTotalWriteBytes, stat.getTotalWriteErrors, stat.getFileSizeInBytes)) + } + } + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowCommitFilesProcedure() + + private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = { + val instants: util.List[HoodieInstant] = util.Arrays.asList( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)) + + val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i)) + hoodieInstant + } + + private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = { + if (hoodieInstant.isDefined) { + if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { + Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieReplaceCommitMetadata])) + } else { + Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieCommitMetadata])) + } + } else { + Option.empty + } + } +} + +object ShowCommitFilesProcedure { + val NAME = "show_commit_files" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowCommitFilesProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala new file mode 100644 index 000000000..d358f996f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata, HoodieWriteStat} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.List +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowCommitPartitionsProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_records_inserted", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_records_updated", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = metaClient.getActiveTimeline + val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants + val hoodieInstantOption = getCommitForInstant(timeline, instantTime) + val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption) + + if (commitMetadataOptional.isEmpty) { + throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.") + } + + val meta = commitMetadataOptional.get + val rows = new util.ArrayList[Row] + for (entry <- meta.getPartitionToWriteStats.entrySet) { + val action: String = hoodieInstantOption.get.getAction + val path: String = entry.getKey + val stats: List[HoodieWriteStat] = entry.getValue + var totalFilesAdded: Long = 0 + var totalFilesUpdated: Long = 0 + var totalRecordsUpdated: Long = 0 + var totalRecordsInserted: Long = 0 + var totalBytesWritten: Long = 0 + var totalWriteErrors: Long = 0 + for (stat <- stats) { + if (stat.getPrevCommit == HoodieWriteStat.NULL_COMMIT) { + totalFilesAdded += 1 + } + else { + totalFilesUpdated += 1 + totalRecordsUpdated += stat.getNumUpdateWrites + } + totalRecordsInserted += stat.getNumInserts + totalBytesWritten += stat.getTotalWriteBytes + totalWriteErrors += stat.getTotalWriteErrors + } + rows.add(Row(action, path, totalFilesAdded, totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated, + totalBytesWritten, totalWriteErrors)) + } + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowCommitPartitionsProcedure() + + private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = { + val instants: util.List[HoodieInstant] = util.Arrays.asList( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)) + + val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i)) + hoodieInstant + } + + private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = { + if (hoodieInstant.isDefined) { + if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { + Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieReplaceCommitMetadata])) + } else { + Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieCommitMetadata])) + } + } else { + Option.empty + } + } +} + +object ShowCommitPartitionsProcedure { + val NAME = "show_commit_partitions" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowCommitPartitionsProcedure() + } +} + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala new file mode 100644 index 000000000..594d18769 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowCommitWriteStatsProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("avg_record_size", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val instantTime = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = metaClient.getActiveTimeline + val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants + val hoodieInstantOption = getCommitForInstant(timeline, instantTime) + val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption) + + if (commitMetadataOptional.isEmpty) { + throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.") + } + + val meta = commitMetadataOptional.get + + val action: String = hoodieInstantOption.get.getAction + val recordsWritten = meta.fetchTotalRecordsWritten + val bytesWritten = meta.fetchTotalBytesWritten + val avgRecSize = Math.ceil((1.0 * bytesWritten) / recordsWritten).toLong + val rows = new util.ArrayList[Row] + rows.add(Row(action, bytesWritten, recordsWritten, avgRecSize)) + + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowCommitWriteStatsProcedure() + + private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = { + val instants: util.List[HoodieInstant] = util.Arrays.asList( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)) + + val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i)) + hoodieInstant + } + + private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = { + if (hoodieInstant.isDefined) { + if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { + Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieReplaceCommitMetadata])) + } else { + Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get, + classOf[HoodieCommitMetadata])) + } + } else { + Option.empty + } + } +} + +object ShowCommitWriteStatsProcedure { + val NAME = "show_commit_write_stats" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowCommitWriteStatsProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala new file mode 100644 index 000000000..750b3943a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestCommitsProcedure extends HoodieSparkSqlTestBase { + + test("Test Call show_archived_commits Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.keep.max.commits = 3, + | hoodie.keep.min.commits = 2, + | hoodie.cleaner.commits.retained = 1 + | ) + """.stripMargin) + + // insert data to table, will generate 3 active commits and 4 archived commits + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500") + spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000") + spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500") + spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000") + + // Check required fields + checkExceptionContain(s"""call show_archived_commits(limit => 10)""")( + s"Argument: table is required") + + // collect active commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(3){commits.length} + + // collect archived commits for table + val endTs = commits(0).get(0).toString + val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', endTs => '$endTs')""").collect() + assertResult(4) { + archivedCommits.length + } + } + } + + test("Test Call show_archived_commits_metadata Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.keep.max.commits = 3, + | hoodie.keep.min.commits = 2, + | hoodie.cleaner.commits.retained = 1 + | ) + """.stripMargin) + + // insert data to table, will generate 3 active commits and 4 archived commits + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500") + spark.sql(s"insert into $tableName select 5, 'a5', 50, 3000") + spark.sql(s"insert into $tableName select 6, 'a6', 60, 3500") + spark.sql(s"insert into $tableName select 7, 'a7', 70, 4000") + + // Check required fields + checkExceptionContain(s"""call show_archived_commits_metadata(limit => 10)""")( + s"Argument: table is required") + + // collect active commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(3){commits.length} + + // collect archived commits for table + val endTs = commits(0).get(0).toString + val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', endTs => '$endTs')""").collect() + assertResult(4) { + archivedCommits.length + } + } + } + + test("Test Call show_commit_files Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_commit_files(table => '$tableName')""")( + s"Argument: instant_time is required") + + // collect commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(2){commits.length} + + // collect commit files for table + val instant_time = commits(0).get(0).toString + val commitFiles = spark.sql(s"""call show_commit_files(table => '$tableName', instant_time => '$instant_time')""").collect() + assertResult(1){commitFiles.length} + } + } + + test("Test Call show_commit_partitions Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_commit_partitions(table => '$tableName')""")( + s"Argument: instant_time is required") + + // collect commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(2){commits.length} + + // collect commit partitions files for table + val instant_time = commits(0).get(0).toString + val commitPartitions = spark.sql(s"""call show_commit_partitions(table => '$tableName', instant_time => '$instant_time')""").collect() + assertResult(1){commitPartitions.length} + } + } + + test("Test Call show_commit_write_stats Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // Check required fields + checkExceptionContain(s"""call show_commit_write_stats(table => '$tableName')""")( + s"Argument: instant_time is required") + + // collect commits for table + val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(2){commits.length} + + // collect commit write stats for table + val instant_time = commits(0).get(0).toString + val commitPartitions = spark.sql(s"""call show_commit_write_stats(table => '$tableName', instant_time => '$instant_time')""").collect() + assertResult(1){commitPartitions.length} + } + } + + test("Test Call commits_compare Procedure") { + withTempDir { tmp => + val tableName1 = generateTableName + val tableName2 = generateTableName + // create table1 + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName1' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table1 + spark.sql(s"insert into $tableName1 select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName1 select 2, 'a2', 20, 1500") + + // create table2 + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName2' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table2 + spark.sql(s"insert into $tableName2 select 3, 'a3', 30, 2000") + spark.sql(s"insert into $tableName2 select 4, 'a4', 40, 2500") + + // Check required fields + checkExceptionContain(s"""call commits_compare(table => '$tableName1')""")( + s"Argument: path is required") + + // collect commits for table1 + var commits1 = spark.sql(s"""call show_commits(table => '$tableName1', limit => 10)""").collect() + assertResult(2){commits1.length} + + // collect commits for table2 + var commits2 = spark.sql(s"""call show_commits(table => '$tableName2', limit => 10)""").collect() + assertResult(2){commits2.length} + + // collect commits compare for table1 and table2 + val result = spark.sql(s"""call commits_compare(table => '$tableName1', path => '${tmp.getCanonicalPath}/$tableName2')""").collect() + assertResult(1){result.length} + } + } +}