1
0

[HUDI-3511] Add call procedure for MetadataCommand (#6018)

This commit is contained in:
superche
2022-07-03 21:44:56 +08:00
committed by GitHub
parent c0e1587966
commit e0954040a9
9 changed files with 890 additions and 0 deletions

View File

@@ -66,6 +66,13 @@ object HoodieProcedures {
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
mapBuilder.put(ListMetadataFilesProcedure.NAME, ListMetadataFilesProcedure.builder)
mapBuilder.put(ListMetadataPartitionsProcedure.NAME, ListMetadataPartitionsProcedure.builder)
mapBuilder.put(MetadataCreateProcedure.NAME, MetadataCreateProcedure.builder)
mapBuilder.put(MetadataDeleteProcedure.NAME, MetadataDeleteProcedure.builder)
mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder)
mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder)
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder)
mapBuilder.build
}
}

View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.function.Supplier
class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "partition", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("file_path", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0))
val partition = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val config = HoodieMetadataConfig.newBuilder.enable(true).build
val metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf),
config, basePath, "/tmp")
if (!metaReader.enabled){
throw new HoodieException(s"Metadata Table not enabled/initialized.")
}
val timer = new HoodieTimer().startTimer
val statuses = metaReader.getAllFilesInPartition(new Path(basePath, partition))
logDebug("Took " + timer.endTimer + " ms")
val rows = new util.ArrayList[Row]
statuses.toStream.sortBy(p => p.getPath.getName).foreach((f: FileStatus) => {
rows.add(Row(f.getPath.getName))
})
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ListMetadataFilesProcedure()
}
object ListMetadataFilesProcedure {
val NAME = "list_metadata_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ListMetadataFilesProcedure()
}
}

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConverters.asScalaIteratorConverter
class ListMetadataPartitionsProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0))
val basePath = getBasePath(table)
val config = HoodieMetadataConfig.newBuilder.enable(true).build
val metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc),
config, basePath, "/tmp")
if (!metadata.enabled){
throw new HoodieException(s"Metadata Table not enabled/initialized.")
}
val timer = new HoodieTimer().startTimer
val partitions = metadata.getAllPartitionPaths
Collections.sort(partitions)
logDebug("Took " + timer.endTimer + " ms")
val rows = new util.ArrayList[Row]
partitions.stream.iterator().asScala.foreach((p: String) => {
rows.add(Row(p))
})
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ListMetadataPartitionsProcedure()
}
object ListMetadataPartitionsProcedure {
val NAME = "list_metadata_partitions"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ListMetadataPartitionsProcedure()
}
}

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier
class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val basePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
try {
val statuses = metaClient.getFs.listStatus(metadataPath)
if (statuses.nonEmpty) {
throw new RuntimeException("Metadata directory (" + metadataPath.toString + ") not empty.")
}
} catch {
case e: FileNotFoundException =>
// Metadata directory does not exist yet
metaClient.getFs.mkdirs(metadataPath)
}
val timer = new HoodieTimer().startTimer
val writeConfig = getWriteConfig(basePath)
SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, writeConfig, new HoodieSparkEngineContext(jsc))
Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)"))
}
override def build = new MetadataCreateProcedure()
}
object MetadataCreateProcedure {
val NAME = "metadata_create"
var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataCreateProcedure()
}
}

View File

@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier
class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val basePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
try {
val statuses = metaClient.getFs.listStatus(metadataPath)
if (statuses.nonEmpty) metaClient.getFs.delete(metadataPath, true)
} catch {
case e: FileNotFoundException =>
// Metadata directory does not exist
}
Seq(Row("Removed Metadata Table from " + metadataPath))
}
override def build = new MetadataDeleteProcedure()
}
object MetadataDeleteProcedure {
val NAME = "metadata_delete"
var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataDeleteProcedure()
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.io.FileNotFoundException
import java.util.function.Supplier
class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val readOnly = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val basePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
try {
metaClient.getFs.listStatus(metadataPath)
} catch {
case e: FileNotFoundException =>
// Metadata directory does not exist yet
throw new RuntimeException("Metadata directory (" + metadataPath.toString + ") does not exist.")
}
val timer = new HoodieTimer().startTimer
if (!readOnly) {
val writeConfig = getWriteConfig(basePath)
SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, writeConfig, new HoodieSparkEngineContext(jsc))
}
val action = if (readOnly) "Opened" else "Initialized"
Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)"))
}
override def build = new MetadataInitProcedure()
}
object MetadataInitProcedure {
val NAME = "metadata_init"
var metadataBaseDirectory: Option[String] = None
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new MetadataInitProcedure()
}
}

View File

@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.function.Supplier
import scala.collection.JavaConversions._
class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("stat_key", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("stat_value", DataTypes.StringType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0))
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val config = HoodieMetadataConfig.newBuilder.enable(true).build
val metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf),
config, basePath, "/tmp")
val stats = metadata.stats
val rows = new util.ArrayList[Row]
for (entry <- stats.entrySet) {
rows.add(Row(entry.getKey, entry.getValue))
}
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ShowMetadataStatsProcedure()
}
object ShowMetadataStatsProcedure {
val NAME = "show_metadata_stats"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowMetadataStatsProcedure()
}
}

View File

@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.HoodieBackedTableMetadata
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.asScalaIteratorConverter
class ValidateMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("is_present_in_fs", DataTypes.BooleanType, nullable = true, Metadata.empty),
StructField("is_resent_in_metadata", DataTypes.BooleanType, nullable = true, Metadata.empty),
StructField("fs_size", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("metadata_size", DataTypes.LongType, nullable = true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val table = getArgValueOrDefault(args, PARAMETERS(0))
val verbose = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val basePath = getBasePath(table)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val config = HoodieMetadataConfig.newBuilder.enable(true).build
val metadataReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf),
config, basePath, "/tmp")
if (!metadataReader.enabled){
throw new HoodieException(s"Metadata Table not enabled/initialized.")
}
val fsConfig = HoodieMetadataConfig.newBuilder.enable(false).build
val fsMetaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf),
fsConfig, basePath, "/tmp")
val timer = new HoodieTimer().startTimer
val metadataPartitions = metadataReader.getAllPartitionPaths
logDebug("Listing partitions Took " + timer.endTimer + " ms")
val fsPartitions = fsMetaReader.getAllPartitionPaths
Collections.sort(fsPartitions)
Collections.sort(metadataPartitions)
val allPartitions = new util.HashSet[String]
allPartitions.addAll(fsPartitions)
allPartitions.addAll(metadataPartitions)
if (!fsPartitions.equals(metadataPartitions)) {
logError("FS partition listing is not matching with metadata partition listing!")
logError("All FS partitions: " + util.Arrays.toString(fsPartitions.toArray))
logError("All Metadata partitions: " + util.Arrays.toString(metadataPartitions.toArray))
}
val rows = new util.ArrayList[Row]
for (partition <- allPartitions) {
val fileStatusMap = new util.HashMap[String, FileStatus]
val metadataFileStatusMap = new util.HashMap[String, FileStatus]
val metadataStatuses = metadataReader.getAllFilesInPartition(new Path(basePath, partition))
util.Arrays.stream(metadataStatuses).iterator().asScala.foreach((entry: FileStatus) => metadataFileStatusMap.put(entry.getPath.getName, entry))
val fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(basePath, partition))
util.Arrays.stream(fsStatuses).iterator().asScala.foreach((entry: FileStatus) => fileStatusMap.put(entry.getPath.getName, entry))
val allFiles = new util.HashSet[String]
allFiles.addAll(fileStatusMap.keySet)
allFiles.addAll(metadataFileStatusMap.keySet)
for (file <- allFiles) {
val fsFileStatus = fileStatusMap.get(file)
val metaFileStatus = metadataFileStatusMap.get(file)
val doesFsFileExists = fsFileStatus != null
val doesMetadataFileExists = metaFileStatus != null
val fsFileLength = if (doesFsFileExists) fsFileStatus.getLen else 0
val metadataFileLength = if (doesMetadataFileExists) metaFileStatus.getLen else 0
if (verbose) { // if verbose print all files
rows.add(Row(partition, file, doesFsFileExists, doesMetadataFileExists, fsFileLength, metadataFileLength))
} else if ((doesFsFileExists != doesMetadataFileExists) || (fsFileLength != metadataFileLength)) { // if non verbose, print only non matching files
rows.add(Row(partition, file, doesFsFileExists, doesMetadataFileExists, fsFileLength, metadataFileLength))
}
}
if (metadataStatuses.length != fsStatuses.length) {
logError(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count " + metadataStatuses.length)
}
for (entry <- fileStatusMap.entrySet) {
if (!metadataFileStatusMap.containsKey(entry.getKey)) {
logError("FS file not found in metadata " + entry.getKey)
} else if (entry.getValue.getLen != metadataFileStatusMap.get(entry.getKey).getLen) {
logError(" FS file size mismatch " + entry.getKey + ", size equality " + (entry.getValue.getLen == metadataFileStatusMap.get(entry.getKey).getLen) + ". FS size " + entry.getValue.getLen + ", metadata size " + metadataFileStatusMap.get(entry.getKey).getLen)
}
}
for (entry <- metadataFileStatusMap.entrySet) {
if (!fileStatusMap.containsKey(entry.getKey)) {
logError("Metadata file not found in FS " + entry.getKey)
} else if (entry.getValue.getLen != fileStatusMap.get(entry.getKey).getLen) {
logError(" Metadata file size mismatch " + entry.getKey + ", size equality " + (entry.getValue.getLen == fileStatusMap.get(entry.getKey).getLen) + ". Metadata size " + entry.getValue.getLen + ", FS size " + metadataFileStatusMap.get(entry.getKey).getLen)
}
}
}
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new ValidateMetadataFilesProcedure()
}
object ValidateMetadataFilesProcedure {
val NAME = "validate_metadata_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ValidateMetadataFilesProcedure()
}
}

View File

@@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
class TestMetadataProcedure extends HoodieSparkSqlTestBase {
test("Test Call metadata_delete Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// delete the metadata
val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect()
assertResult(1) {
deleteResult.length
}
}
}
test("Test Call metadata_create Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// The first step is delete the metadata
val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect()
assertResult(1) {
deleteResult.length
}
// The second step is create the metadata
val createResult = spark.sql(s"""call metadata_create(table => '$tableName')""").collect()
assertResult(1) {
createResult.length
}
}
}
test("Test Call metadata_init Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// read only, no initialize
val readResult = spark.sql(s"""call metadata_init(table => '$tableName', readOnly => true)""").collect()
assertResult(1) {
readResult.length
}
// initialize metadata
val initResult = spark.sql(s"""call metadata_init(table => '$tableName')""").collect()
assertResult(1) {
initResult.length
}
}
}
test("Test Call show_metadata_stats Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.metadata.metrics.enable = 'true'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata stats for table
val metadataStats = spark.sql(s"""call show_metadata_stats(table => '$tableName')""").collect()
assertResult(0) {
metadataStats.length
}
}
}
test("Test Call list_metadata_partitions Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| partitioned by (ts)
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata partitions for table
val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect()
assertResult(2) {
partitions.length
}
}
}
test("Test Call list_metadata_files Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| partitioned by (ts)
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect metadata partitions for table
val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect()
assertResult(2) {
partitions.length
}
// collect metadata files for a partition of a table
val partition = partitions(0).get(0).toString
val filesResult = spark.sql(s"""call list_metadata_files(table => '$tableName', partition => '$partition')""").collect()
assertResult(1) {
filesResult.length
}
}
}
test("Test Call validate_metadata_files Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| partitioned by (ts)
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// collect validate metadata files result
val validateFilesResult = spark.sql(s"""call validate_metadata_files(table => '$tableName')""").collect()
assertResult(0) {
validateFilesResult.length
}
// collect validate metadata files result with verbose
val validateFilesVerboseResult = spark.sql(s"""call validate_metadata_files(table => '$tableName', verbose => true)""").collect()
assertResult(2) {
validateFilesVerboseResult.length
}
}
}
}