1
0

[HUDI-3509] Add call procedure for HoodieLogFileCommand (#5949)

Co-authored-by: zhanshaoxiong <jiimmyzhan@tencent.com>
This commit is contained in:
jiz
2022-06-24 10:16:54 +08:00
committed by GitHub
parent eeb78f23e6
commit af9f09047d
4 changed files with 369 additions and 0 deletions

View File

@@ -50,6 +50,8 @@ object HoodieProcedures {
mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder)
mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder)
mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder)
mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
mapBuilder.build
}
}

View File

@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.{Lists, Maps}
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType}
import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.Objects
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier
import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorConverter, mapAsScalaMapConverter}
class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBuilder {
override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "log_file_path_pattern", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
)
override def outputType: StructType = StructType(Array[StructField](
StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("record_count", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("block_type", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("header_metadata", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("footer_metadata", DataTypes.StringType, nullable = true, Metadata.empty)
))
override def call(args: ProcedureArgs): Seq[Row] = {
checkArgs(parameters, args)
val table = getArgValueOrDefault(args, parameters(0))
val logFilePathPattern: String = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String]
val limit: Int = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Int]
val basePath = getBasePath(table)
val fs = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build.getFs
val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).iterator().asScala
.map(_.getPath.toString).toList
val commitCountAndMetadata: java.util.Map[String, java.util.List[(HoodieLogBlockType, (java.util.Map[HeaderMetadataType, String], java.util.Map[HeaderMetadataType, String]), Int)]] = Maps.newHashMap()
var numCorruptBlocks = 0
var dummyInstantTimeCount = 0
logFilePaths.foreach {
logFilePath => {
val statuses = fs.listStatus(new Path(logFilePath))
val schema = new AvroSchemaConverter()
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath))))
val reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(statuses(0).getPath), schema)
// read the avro blocks
while (reader.hasNext) {
val block = reader.next()
val recordCount = new AtomicInteger(0)
var instantTime: String = null
if (block.isInstanceOf[HoodieCorruptBlock]) {
try {
instantTime = block.getLogBlockHeader.get(HeaderMetadataType.INSTANT_TIME)
if (null == instantTime) {
throw new java.lang.Exception("Invalid instant time " + instantTime)
}
} catch {
case _: java.lang.Exception =>
numCorruptBlocks = numCorruptBlocks + 1;
instantTime = "corrupt_block_" + numCorruptBlocks
}
} else {
instantTime = block.getLogBlockHeader.get(HeaderMetadataType.INSTANT_TIME)
if (null == instantTime) {
dummyInstantTimeCount = dummyInstantTimeCount + 1
instantTime = "dummy_instant_time_" + dummyInstantTimeCount
}
block match {
case dataBlock: HoodieDataBlock =>
val recordItr = dataBlock.getRecordIterator
recordItr.asScala.foreach(_ => recordCount.incrementAndGet())
recordItr.close()
}
}
if (commitCountAndMetadata.containsKey(instantTime)) {
val list = commitCountAndMetadata.get(instantTime)
list.add((block.getBlockType, (block.getLogBlockHeader, block.getLogBlockFooter), recordCount.get()))
} else {
val list = Lists.newArrayList((block.getBlockType, (block.getLogBlockHeader, block.getLogBlockFooter), recordCount.get()))
commitCountAndMetadata.put(instantTime, list)
}
}
reader.close()
}
}
val rows: java.util.List[Row] = Lists.newArrayList()
val objectMapper = new ObjectMapper()
commitCountAndMetadata.asScala.foreach {
case (instantTime, values) =>
values.asScala.foreach {
tuple3 =>
rows.add(Row(
instantTime,
tuple3._3,
tuple3._1.toString,
objectMapper.writeValueAsString(tuple3._2._1),
objectMapper.writeValueAsString(tuple3._2._2)
))
}
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ShowHoodieLogFileMetadataProcedure
}
object ShowHoodieLogFileMetadataProcedure {
val NAME = "show_logfile_metadata"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new ShowHoodieLogFileMetadataProcedure()
}
}

View File

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import com.google.common.collect.Lists
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieLogFile
import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.Objects
import java.util.function.Supplier
import scala.collection.JavaConverters._
class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuilder {
override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "log_file_path_pattern", DataTypes.StringType, None),
ProcedureParameter.optional(2, "merge", DataTypes.BooleanType, false),
ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10)
)
override def outputType: StructType = StructType(Array[StructField](
StructField("records", DataTypes.StringType, nullable = true, Metadata.empty)
))
override def call(args: ProcedureArgs): Seq[Row] = {
checkArgs(parameters, args)
val table = getArgValueOrDefault(args, parameters(0))
val logFilePathPattern: String = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String]
val merge: Boolean = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Boolean]
val limit: Int = getArgValueOrDefault(args, parameters(3)).get.asInstanceOf[Int]
val basePath = getBasePath(table)
val client = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val fs = client.getFs
val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).iterator().asScala
.map(_.getPath.toString).toList
ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log file")
val converter = new AvroSchemaConverter()
val allRecords: java.util.List[IndexedRecord] = Lists.newArrayList()
if (merge) {
val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.last))))
val scanner = HoodieMergedLogRecordScanner.newBuilder
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logFilePaths.asJava)
.withReaderSchema(schema)
.withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp)
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue)
.withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue)
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
.build
scanner.asScala.foreach(hoodieRecord => {
val record = hoodieRecord.getData.getInsertValue(schema).get()
if (allRecords.size() < limit) {
allRecords.add(record)
}
})
} else {
logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
logFilePath => {
val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath))))
val reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePath), schema)
while (reader.hasNext) {
val block = reader.next()
block match {
case dataBlock: HoodieDataBlock =>
val recordItr = dataBlock.getRecordIterator
recordItr.asScala.foreach(record => {
if (allRecords.size() < limit) {
allRecords.add(record)
}
})
recordItr.close()
}
}
reader.close()
}
}
}
val rows: java.util.List[Row] = Lists.newArrayList()
allRecords.asScala.foreach(record => {
rows.add(Row(record.toString))
})
rows.asScala
}
override def build: Procedure = new ShowHoodieLogFileRecordsProcedure
}
object ShowHoodieLogFileRecordsProcedure {
val NAME = "show_logfile_records"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get(): ProcedureBuilder = new ShowHoodieLogFileRecordsProcedure()
}
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestHoodieLogFileProcedure extends HoodieSparkSqlTestBase {
test("Test Call show_logfile_metadata Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")
// Check required fields
checkExceptionContain(s"""call show_logfile_metadata(limit => 10)""")(
s"Argument: table is required")
// collect result for table
val result = spark.sql(
s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/ts=1000/*.log.*')""".stripMargin).collect()
assertResult(1) {
result.length
}
}
}
test("Test Call show_logfile_records Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"update $tableName set name = 'b1' where id = 1")
spark.sql(s"update $tableName set name = 'b2' where id = 2")
// Check required fields
checkExceptionContain(s"""call show_logfile_records(limit => 10)""")(
s"Argument: table is required")
// collect result for table
val result = spark.sql(
s"""call show_logfile_records(table => '$tableName', log_file_path_pattern => '$tablePath/*/*.log.*', limit => 1)""".stripMargin).collect()
assertResult(1) {
result.length
}
}
}
}