[HUDI-3512] Add call procedure for StatsCommand (#5955)
Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
This commit is contained in:
@@ -52,6 +52,8 @@ object HoodieProcedures {
|
||||
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
|
||||
mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
|
||||
mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
|
||||
mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
|
||||
mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
|
||||
mapBuilder.build
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hudi.command.procedures
|
||||
|
||||
import com.codahale.metrics.{Histogram, Snapshot, UniformReservoir}
|
||||
import com.google.common.collect.{Lists, Maps}
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.hudi.command.procedures.StatsFileSizeProcedure.MAX_FILES
|
||||
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
|
||||
|
||||
import java.util.function.Supplier
|
||||
import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter}
|
||||
|
||||
class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {
|
||||
|
||||
override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
|
||||
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
|
||||
ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
|
||||
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
|
||||
)
|
||||
|
||||
override def outputType: StructType = StructType(Array[StructField](
|
||||
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
|
||||
StructField("min", DataTypes.LongType, nullable = true, Metadata.empty),
|
||||
StructField("10th", DataTypes.DoubleType, nullable = true, Metadata.empty),
|
||||
StructField("50th", DataTypes.DoubleType, nullable = true, Metadata.empty),
|
||||
StructField("avg", DataTypes.DoubleType, nullable = true, Metadata.empty),
|
||||
StructField("95th", DataTypes.DoubleType, nullable = true, Metadata.empty),
|
||||
StructField("max", DataTypes.LongType, nullable = true, Metadata.empty),
|
||||
StructField("num_files", DataTypes.IntegerType, nullable = true, Metadata.empty),
|
||||
StructField("stddev", DataTypes.DoubleType, nullable = true, Metadata.empty)
|
||||
))
|
||||
|
||||
override def call(args: ProcedureArgs): Seq[Row] = {
|
||||
checkArgs(parameters, args)
|
||||
val table = getArgValueOrDefault(args, parameters(0))
|
||||
val globRegex = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String]
|
||||
val limit: Int = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Int]
|
||||
val basePath = getBasePath(table)
|
||||
val fs = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build.getFs
|
||||
val globPath = String.format("%s/%s/*", basePath, globRegex)
|
||||
val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath))
|
||||
|
||||
val globalHistogram = new Histogram(new UniformReservoir(MAX_FILES))
|
||||
val commitHistogramMap: java.util.Map[String, Histogram] = Maps.newHashMap()
|
||||
statuses.asScala.foreach(
|
||||
status => {
|
||||
val instantTime = FSUtils.getCommitTime(status.getPath.getName)
|
||||
val len = status.getLen
|
||||
commitHistogramMap.putIfAbsent(instantTime, new Histogram(new UniformReservoir(MAX_FILES)))
|
||||
commitHistogramMap.get(instantTime).update(len)
|
||||
globalHistogram.update(len)
|
||||
}
|
||||
)
|
||||
val rows: java.util.List[Row] = Lists.newArrayList()
|
||||
commitHistogramMap.asScala.foreach {
|
||||
case (instantTime, histogram) =>
|
||||
val snapshot = histogram.getSnapshot
|
||||
rows.add(printFileSizeHistogram(instantTime, snapshot))
|
||||
}
|
||||
val snapshot = globalHistogram.getSnapshot
|
||||
rows.add(printFileSizeHistogram("ALL", snapshot))
|
||||
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
|
||||
}
|
||||
|
||||
def printFileSizeHistogram(instantTime: String, snapshot: Snapshot): Row = {
|
||||
Row(
|
||||
instantTime,
|
||||
snapshot.getMin,
|
||||
snapshot.getValue(0.1),
|
||||
snapshot.getMedian,
|
||||
snapshot.getMean,
|
||||
snapshot.get95thPercentile,
|
||||
snapshot.getMax,
|
||||
snapshot.size,
|
||||
snapshot.getStdDev
|
||||
)
|
||||
}
|
||||
|
||||
override def build: Procedure = new StatsFileSizeProcedure
|
||||
}
|
||||
|
||||
object StatsFileSizeProcedure {
|
||||
val MAX_FILES = 1000000
|
||||
val NAME = "stats_filesizes"
|
||||
|
||||
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
|
||||
override def get(): ProcedureBuilder = new StatsFileSizeProcedure()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hudi.command.procedures
|
||||
|
||||
import com.google.common.collect.Lists
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
|
||||
|
||||
import java.text.DecimalFormat
|
||||
import java.util.function.Supplier
|
||||
import scala.collection.JavaConverters.asScalaIteratorConverter
|
||||
|
||||
class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuilder {
|
||||
override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
|
||||
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
|
||||
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
|
||||
)
|
||||
|
||||
override def outputType: StructType = StructType(Array[StructField](
|
||||
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
|
||||
StructField("total_upserted", DataTypes.LongType, nullable = true, Metadata.empty),
|
||||
StructField("total_written", DataTypes.LongType, nullable = true, Metadata.empty),
|
||||
StructField("write_amplification_factor", DataTypes.StringType, nullable = true, Metadata.empty)
|
||||
))
|
||||
|
||||
override def call(args: ProcedureArgs): Seq[Row] = {
|
||||
checkArgs(parameters, args)
|
||||
val table = getArgValueOrDefault(args, parameters(0))
|
||||
val limit: Int = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[Int]
|
||||
val basePath = getBasePath(table)
|
||||
val client = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
|
||||
val activeTimeline = client.getActiveTimeline
|
||||
val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants()
|
||||
|
||||
val rows: java.util.List[Row] = Lists.newArrayList()
|
||||
val df = new DecimalFormat("#.00")
|
||||
var totalRecordsUpserted = 0L
|
||||
var totalRecordsWritten = 0L
|
||||
timeline.getInstants.iterator.asScala.foreach(
|
||||
instantTime => {
|
||||
var waf = "0"
|
||||
val commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instantTime).get(), classOf[HoodieCommitMetadata])
|
||||
if (commit.fetchTotalUpdateRecordsWritten() > 0) {
|
||||
waf = df.format(commit.fetchTotalRecordsWritten().toFloat / commit.fetchTotalUpdateRecordsWritten())
|
||||
}
|
||||
rows.add(Row(instantTime.getTimestamp, commit.fetchTotalUpdateRecordsWritten, commit.fetchTotalRecordsWritten, waf))
|
||||
totalRecordsUpserted = totalRecordsUpserted + commit.fetchTotalUpdateRecordsWritten()
|
||||
totalRecordsWritten = totalRecordsWritten + commit.fetchTotalRecordsWritten()
|
||||
}
|
||||
)
|
||||
var waf = "0"
|
||||
if (totalRecordsUpserted > 0) {
|
||||
waf = df.format(totalRecordsWritten.toFloat / totalRecordsUpserted)
|
||||
}
|
||||
rows.add(Row("Total", totalRecordsUpserted, totalRecordsWritten, waf))
|
||||
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
|
||||
}
|
||||
|
||||
override def build: Procedure = new StatsWriteAmplificationProcedure
|
||||
}
|
||||
|
||||
object StatsWriteAmplificationProcedure {
|
||||
val NAME = "stats_wa"
|
||||
|
||||
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
|
||||
override def get(): ProcedureBuilder = new StatsWriteAmplificationProcedure()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hudi.procedure
|
||||
|
||||
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
|
||||
|
||||
class TestStatsProcedure extends HoodieSparkSqlTestBase {
|
||||
test("Test Call stats_wa Procedure") {
|
||||
withTempDir { tmp =>
|
||||
val tableName = generateTableName
|
||||
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
|
||||
// create table
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $tableName (
|
||||
| id int,
|
||||
| name string,
|
||||
| price double,
|
||||
| ts long
|
||||
|) using hudi
|
||||
| partitioned by (ts)
|
||||
| location '$tablePath'
|
||||
| tblproperties (
|
||||
| primaryKey = 'id',
|
||||
| preCombineField = 'ts'
|
||||
| )
|
||||
""".stripMargin)
|
||||
// insert data to table
|
||||
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
|
||||
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
|
||||
spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")
|
||||
|
||||
// Check required fields
|
||||
checkExceptionContain(s"""call stats_wa(limit => 10)""")(
|
||||
s"Argument: table is required")
|
||||
|
||||
// collect result for table
|
||||
val result = spark.sql(
|
||||
s"""call stats_wa(table => '$tableName')""".stripMargin).collect()
|
||||
assertResult(4) {
|
||||
result.length
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("Test Call stats_filesizes Procedure") {
|
||||
withTempDir { tmp =>
|
||||
val tableName = generateTableName
|
||||
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
|
||||
// create table
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $tableName (
|
||||
| id int,
|
||||
| name string,
|
||||
| price double,
|
||||
| ts long
|
||||
|) using hudi
|
||||
| partitioned by (ts)
|
||||
| location '$tablePath'
|
||||
| tblproperties (
|
||||
| primaryKey = 'id',
|
||||
| preCombineField = 'ts'
|
||||
| )
|
||||
""".stripMargin)
|
||||
// insert data to table
|
||||
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
|
||||
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
|
||||
|
||||
// Check required fields
|
||||
checkExceptionContain(s"""call stats_filesizes(limit => 10)""")(
|
||||
s"Argument: table is required")
|
||||
|
||||
// collect result for table
|
||||
val result = spark.sql(
|
||||
s"""call stats_filesizes(table => '$tableName', partition_path => '/*')""".stripMargin).collect()
|
||||
assertResult(3) {
|
||||
result.length
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user