1
0

[HUDI-3500] Add call procedure for RepairsCommand (#6053)

This commit is contained in:
superche
2022-07-09 09:29:14 +08:00
committed by GitHub
parent b686c07407
commit 6566fc6625
11 changed files with 1402 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
object DeDupeType extends Enumeration {
type dedupeType = Value
val INSERT_TYPE = Value("insert_type")
val UPDATE_TYPE = Value("update_type")
val UPSERT_TYPE = Value("upsert_type")
}

View File

@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import java.util.stream.Collectors
import scala.collection.JavaConversions._
import scala.collection.mutable.{Buffer, HashMap, HashSet, ListBuffer}
/**
* Spark job to de-duplicate data present in a partition path
*/
class DedupeSparkJob(basePath: String,
duplicatedPartitionPath: String,
repairOutputPath: String,
sqlContext: SQLContext,
fs: FileSystem,
dedupeType: DeDupeType.Value) {
val sparkHelper = new SparkHelper(sqlContext, fs)
val LOG = Logger.getLogger(this.getClass)
/**
*
* @param tblName
* @return
*/
def getDupeKeyDF(tblName: String): DataFrame = {
val dupeSql =
s"""
select `${HoodieRecord.RECORD_KEY_METADATA_FIELD}` as dupe_key,
count(*) as dupe_cnt
from ${tblName}
group by `${HoodieRecord.RECORD_KEY_METADATA_FIELD}`
having dupe_cnt > 1
"""
sqlContext.sql(dupeSql)
}
/**
*
* Check a given partition for duplicates and suggest the deletions that need to be done in each file,
* in order to set things right.
*
* @return
*/
private def planDuplicateFix(): HashMap[String, HashSet[String]] = {
val tmpTableName = s"htbl_${System.currentTimeMillis()}"
val dedupeTblName = s"${tmpTableName}_dupeKeys"
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
val filteredStatuses = latestFiles.map(f => f.getPath)
LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}")
val df = sqlContext.parquetFile(filteredStatuses: _*)
df.registerTempTable(tmpTableName)
val dupeKeyDF = getDupeKeyDF(tmpTableName)
dupeKeyDF.registerTempTable(dedupeTblName)
// Obtain necessary satellite information for duplicate rows
val dupeDataSql =
s"""
SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time`
FROM $tmpTableName h
JOIN $dedupeTblName d
ON h.`_hoodie_record_key` = d.dupe_key
"""
val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0))
getDedupePlan(dupeMap)
}
private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): HashMap[String, HashSet[String]] = {
val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
dupeMap.foreach(rt => {
val (key, rows) = rt
dedupeType match {
case DeDupeType.UPDATE_TYPE =>
/*
This corresponds to the case where all duplicates have been updated at least once.
Once updated, duplicates are bound to have same commit time unless forcefully modified.
*/
rows.init.foreach(r => {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
})
case DeDupeType.INSERT_TYPE =>
/*
This corresponds to the case where duplicates got created due to INSERT and have never been updated.
*/
var maxCommit = -1L
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c > maxCommit)
maxCommit = c
})
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c != maxCommit) {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
}
})
case DeDupeType.UPSERT_TYPE =>
/*
This corresponds to the case where duplicates got created as a result of inserts as well as updates,
i.e few duplicate records have been updated, while others were never updated.
*/
var maxCommit = -1L
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c > maxCommit)
maxCommit = c
})
val rowsWithMaxCommit = new ListBuffer[Row]()
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c != maxCommit) {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
} else {
rowsWithMaxCommit += r
}
})
rowsWithMaxCommit.toList.init.foreach(r => {
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)) {
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
})
case _ => throw new IllegalArgumentException("Please provide valid type for deduping!")
}
})
LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: $fileToDeleteKeyMap")
fileToDeleteKeyMap
}
def fixDuplicates(dryRun: Boolean = true) = {
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
val fileNameToPathMap = latestFiles.map(f => (f.getFileId, new Path(f.getPath))).toMap
val dupeFixPlan = planDuplicateFix()
// 1. Copy all latest files into the temp fix path
fileNameToPathMap.foreach { case (fileName, filePath) =>
val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
LOG.info(s"Copying from $filePath to $dstPath")
FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf)
}
// 2. Remove duplicates from the bad files
dupeFixPlan.foreach { case (fileName, keysToSkip) =>
val instantTime = FSUtils.getCommitTime(fileNameToPathMap(fileName).getName)
val badFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}.bad")
val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}")
LOG.info(" Skipping and writing new file for : " + fileName)
SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName))
fs.delete(badFilePath, true)
}
// 3. Check that there are no duplicates anymore.
val df = sqlContext.read.parquet(s"$repairOutputPath/*.parquet")
df.registerTempTable("fixedTbl")
val dupeKeyDF = getDupeKeyDF("fixedTbl")
val dupeCnt = dupeKeyDF.count()
if (dupeCnt != 0) {
dupeKeyDF.show()
throw new HoodieException("Still found some duplicates!!.. Inspect output")
}
// 4. Additionally ensure no record keys are left behind.
val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => t._2.toString).toList)
val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"$repairOutputPath/${t._2.getName}").toList)
val missedRecordKeysDF = sourceDF.except(fixedDF)
val missedCnt = missedRecordKeysDF.count()
if (missedCnt != 0) {
missedRecordKeysDF.show()
throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!")
}
println("No duplicates found & counts are in check!!!! ")
// 5. Prepare to copy the fixed files back.
fileNameToPathMap.foreach { case (_, filePath) =>
val srcPath = new Path(s"$repairOutputPath/${filePath.getName}")
val dstPath = new Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}")
if (dryRun) {
LOG.info(s"[JUST KIDDING!!!] Copying from $srcPath to $dstPath")
} else {
// for real
LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf)
}
}
}
}

View File

@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable._
object SparkHelpers {
@throws[Exception]
def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
writer.writeAvro(key, rec)
}
}
writer.close
}
}
/**
* Bunch of Spark Shell/Scala stuff useful for debugging
*/
class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
/**
* Print keys from a file
*
* @param file
*/
def printKeysFromFile(file: String) = {
getRowKeyDF(file).collect().foreach(println(_))
}
/**
*
* @param file
* @return
*/
def getRowKeyDF(file: String): DataFrame = {
sqlContext.read.parquet(file).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
}
/**
* Does the rowKey actually exist in the file.
*
* @param rowKey
* @param file
* @return
*/
def isFileContainsKey(rowKey: String, file: String): Boolean = {
println(s"Checking $file for key $rowKey")
val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '$rowKey'")
if (ff.count() > 0) true else false
}
/**
* Number of keys in a given file
*
* @param file
* @param sqlContext
*/
def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) = {
val keyCount = getRowKeyDF(file).collect().length
println(keyCount)
keyCount
}
/**
*
* Checks that all the keys in the file, have been added to the bloom filter
* in the footer
*
* @param conf
* @param sqlContext
* @param file
* @return
*/
def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = {
val bf = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new Path(file))
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().count(r => !bf.mightContain(r.getString(0)))
val totalCount = getKeyCount(file, sqlContext)
println(s"totalCount: $totalCount, foundCount: $foundCount")
totalCount == foundCount
}
def getDistinctKeyDF(paths: List[String]): DataFrame = {
sqlContext.read.parquet(paths: _*).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`").distinct()
}
}

View File

@@ -75,6 +75,11 @@ object HoodieProcedures {
mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder)
mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder)
mapBuilder.put(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder)
mapBuilder.put(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder)
mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder)
mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder)
mapBuilder.build
}
}

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodiePartitionMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util
import java.util.function.Supplier
import scala.collection.JavaConversions._
class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("metadata_is_present", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val dryRun = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val tablePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
val partitionPaths: util.List[String] = FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getFs, tablePath);
val basePath: Path = new Path(tablePath)
val rows = new util.ArrayList[Row](partitionPaths.size)
for (partition <- partitionPaths) {
val partitionPath: Path = FSUtils.getPartitionPath(basePath, partition)
var isPresent = "Yes"
var action = "None"
if (!HoodiePartitionMetadata.hasPartitionMetadata(metaClient.getFs, partitionPath)) {
isPresent = "No"
if (!dryRun) {
val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs, latestCommit, basePath, partitionPath, metaClient.getTableConfig.getPartitionMetafileFormat)
partitionMetadata.trySave(0)
action = "Repaired"
}
}
rows.add(Row(partition, isPresent, action))
}
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new RepairAddpartitionmetaProcedure()
}
object RepairAddpartitionmetaProcedure {
val NAME = "repair_add_partition_meta"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RepairAddpartitionmetaProcedure()
}
}

View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.avro.AvroRuntimeException
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.CleanerUtils
import org.apache.hudi.exception.HoodieIOException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.io.IOException
import java.util.function.Supplier
import scala.collection.JavaConverters.asScalaIteratorConverter
class RepairCorruptedCleanFilesProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
val cleanerTimeline = metaClient.getActiveTimeline.getCleanerTimeline
logInfo("Inspecting pending clean metadata in timeline for corrupted files")
var result = true
cleanerTimeline.filterInflightsAndRequested.getInstants.iterator().asScala.foreach((instant: HoodieInstant) => {
try {
CleanerUtils.getCleanerPlan(metaClient, instant)
} catch {
case e: AvroRuntimeException =>
logWarning("Corruption found. Trying to remove corrupted clean instant file: " + instant)
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant)
case ioe: IOException =>
if (ioe.getMessage.contains("Not an Avro data file")) {
logWarning("Corruption found. Trying to remove corrupted clean instant file: " + instant)
HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant)
} else {
result = false
throw new HoodieIOException(ioe.getMessage, ioe)
}
}
})
Seq(Row(result))
}
override def build: Procedure = new RepairCorruptedCleanFilesProcedure()
}
object RepairCorruptedCleanFilesProcedure {
val NAME = "repair_corrupted_clean_files"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RepairCorruptedCleanFilesProcedure()
}
}

View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.util.function.Supplier
import org.apache.spark.sql.hudi.{DeDupeType, DedupeSparkJob}
import scala.util.{Failure, Success, Try}
class RepairDeduplicateProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "duplicated_partition_path", DataTypes.StringType, None),
ProcedureParameter.required(2, "repaired_output_path", DataTypes.StringType, None),
ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, true),
ProcedureParameter.optional(4, "dedupe_type", DataTypes.StringType, "insert_type")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("result", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val duplicatedPartitionPath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val repairedOutputPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val dryRun = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean]
val dedupeType = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
if (!DeDupeType.values.contains(DeDupeType.withName(dedupeType))) {
throw new IllegalArgumentException("Please provide valid dedupe type!")
}
val basePath = getBasePath(tableName)
Try {
val job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, spark.sqlContext,
FSUtils.getFs(basePath, jsc.hadoopConfiguration), DeDupeType.withName(dedupeType))
job.fixDuplicates(dryRun)
} match {
case Success(_) =>
if (dryRun){
Seq(Row(s"Deduplicated files placed in: $repairedOutputPath."))
} else {
Seq(Row(s"Deduplicated files placed in: $duplicatedPartitionPath."))
}
case Failure(e) =>
throw new HoodieException(s"Deduplication failed!", e)
}
}
override def build: Procedure = new RepairDeduplicateProcedure()
}
object RepairDeduplicateProcedure {
val NAME = "repair_deduplicate"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RepairDeduplicateProcedure()
}
}

View File

@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodiePartitionMetadata
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.Option
import org.apache.hudi.exception.HoodieIOException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.io.IOException
import java.util
import java.util.Properties
import java.util.function.{Consumer, Supplier}
import scala.collection.JavaConversions._
class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("text_metafile_present", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("base_metafile_present", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val dryRun = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val tablePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
val engineContext: HoodieLocalEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf)
val partitionPaths: util.List[String] = FSUtils.getAllPartitionPaths(engineContext, tablePath, false, false)
val basePath: Path = new Path(tablePath)
val rows = new util.ArrayList[Row](partitionPaths.size)
for (partitionPath <- partitionPaths) {
val partition: Path = FSUtils.getPartitionPath(tablePath, partitionPath)
val textFormatFile: Option[Path] = HoodiePartitionMetadata.textFormatMetaPathIfExists(metaClient.getFs, partition)
val baseFormatFile: Option[Path] = HoodiePartitionMetadata.baseFormatMetaPathIfExists(metaClient.getFs, partition)
val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE"
if (!dryRun) {
if (!baseFormatFile.isPresent) {
val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs, latestCommit,
basePath, partition, Option.of(metaClient.getTableConfig.getBaseFileFormat))
partitionMetadata.trySave(0)
}
// delete it, in case we failed midway last time.
textFormatFile.ifPresent(
new Consumer[Path] {
override def accept(p: Path): Unit = {
try metaClient.getFs.delete(p, false)
catch {
case e: IOException =>
throw new HoodieIOException(e.getMessage, e)
}
}
})
action = "MIGRATED"
}
rows.add(Row(partitionPath, String.valueOf(textFormatFile.isPresent),
String.valueOf(baseFormatFile.isPresent), action))
}
val props: Properties = new Properties
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, "true")
HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), props)
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new RepairMigratePartitionMetaProcedure()
}
object RepairMigratePartitionMetaProcedure {
val NAME = "repair_migrate_partition_meta"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RepairMigratePartitionMetaProcedure()
}
}

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
import java.io.FileInputStream
import java.util
import java.util.Properties
import java.util.function.Supplier
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.asScalaIteratorConverter
class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "new_props_file_path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("property", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("old_value", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("new_value", DataTypes.StringType, nullable = true, Metadata.empty))
)
def parameters: Array[ProcedureParameter] = PARAMETERS
def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val overwriteFilePath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val tablePath = getBasePath(tableName)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
var newProps = new Properties
newProps.load(new FileInputStream(overwriteFilePath))
val oldProps = metaClient.getTableConfig.propsMap
val metaPathDir = new Path(tablePath, METAFOLDER_NAME)
HoodieTableConfig.create(metaClient.getFs, metaPathDir, newProps)
// reload new props as checksum would have been added
newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps
val allPropKeys = new util.TreeSet[String]
allPropKeys.addAll(newProps.keySet.stream.iterator().asScala.map(key => key.toString).toList)
allPropKeys.addAll(oldProps.keySet)
val rows = new util.ArrayList[Row](allPropKeys.size)
for (propKey <- allPropKeys) {
rows.add(Row(propKey, oldProps.getOrDefault(propKey, "null"),
newProps.getOrDefault(propKey, "null").toString))
}
rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
}
override def build: Procedure = new RepairOverwriteHoodiePropsProcedure()
}
object RepairOverwriteHoodiePropsProcedure {
val NAME = "repair_overwrite_hoodie_props"
def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new RepairOverwriteHoodiePropsProcedure()
}
}

View File

@@ -0,0 +1,21 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
hoodie.table.name=test_table
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archive
hoodie.timeline.layout.version=1

View File

@@ -0,0 +1,507 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.procedure
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil}
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
import java.io.IOException
import java.net.URL
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters.asScalaIteratorConverter
class TestRepairsProcedure extends HoodieSparkSqlTestBase {
test("Test Call repair_add_partition_meta Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// create commit instant
Files.createFile(Paths.get(tablePath, ".hoodie", "100.commit"))
val metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
// create partition path
val partition1 = Paths.get(tablePath, "2016/03/15").toString
val partition2 = Paths.get(tablePath, "2015/03/16").toString
val partition3 = Paths.get(tablePath, "2015/03/17").toString
assertResult(metaClient.getFs.mkdirs(new Path(partition1))) {true}
assertResult(metaClient.getFs.mkdirs(new Path(partition2))) {true}
assertResult(metaClient.getFs.mkdirs(new Path(partition3))) {true}
// default is dry run
val dryResult = spark.sql(s"""call repair_add_partition_meta(table => '$tableName')""").collect()
assertResult(3) {
dryResult.length
}
// real run
val realRunResult = spark.sql(s"""call repair_add_partition_meta(table => '$tableName', dry_run => false)""").collect()
assertResult(3) {
realRunResult.length
}
}
}
test("Test Call repair_overwrite_hoodie_props Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// create commit instant
val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties")
// overwrite hoodie props
val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect()
assertResult(15) {
Result.length
}
}
}
test("Test Call repair_corrupted_clean_files Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
// Create four requested files
for (i <- 100 until 104) {
val timestamp = String.valueOf(i)
// Write corrupted requested Clean File
createEmptyCleanRequestedFile(tablePath, timestamp, metaClient.getHadoopConf)
}
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// first, there are four instants
assertResult(4) {
metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count
}
checkAnswer(s"""call repair_corrupted_clean_files(table => '$tableName')""")(Seq(true))
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// after clearing, there should be 0 instant
assertResult(0) {
metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count
}
}
}
private var duplicatedPartitionPath: String = null
private var duplicatedPartitionPathWithUpdates: String = null
private var duplicatedPartitionPathWithUpserts: String = null
private var repairedOutputPath: String = null
private var fileFormat: HoodieFileFormat = null
test("Test Call repair_deduplicate Procedure with insert") {
withTempDir { tmp =>
val tableName = generateTableName
val bashPath = tmp.getCanonicalPath
val tablePath = s"$bashPath/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| name string,
| favorite_number int,
| favorite_color string
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'name',
| type = 'cow'
| )
""".stripMargin)
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
generateRecords(tablePath, bashPath, metaClient)
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// get fs and check number of latest files
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
metaClient.getFs.listStatus(new Path(duplicatedPartitionPath)))
val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
// there should be 3 files
assertResult(3) {
filteredStatuses.size
}
// before deduplicate, all files contain 210 records
var files = filteredStatuses.toArray
var recordCount = getRecordCount(files)
assertResult(210){recordCount}
val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH
val result = spark.sql(
s"""call repair_deduplicate(table => '$tableName',
| duplicated_partition_path => '$partitionPath',
| repaired_output_path => '$repairedOutputPath')""".stripMargin).collect()
assertResult(1) {
result.length
}
// after deduplicate, there are 200 records
val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath))
files = fileStatus.map((status: FileStatus) => status.getPath.toString)
recordCount = getRecordCount(files)
assertResult(200){recordCount}
}
}
test("Test Call repair_deduplicate Procedure with update") {
withTempDir { tmp =>
val tableName = generateTableName
val bashPath = tmp.getCanonicalPath
val tablePath = s"$bashPath/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| name string,
| favorite_number int,
| favorite_color string
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'name',
| type = 'cow'
| )
""".stripMargin)
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
generateRecords(tablePath, bashPath, metaClient)
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// get fs and check number of latest files
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpdates)))
val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
// there should be 2 files
assertResult(2) {
filteredStatuses.size
}
// before deduplicate, all files contain 110 records
var files = filteredStatuses.toArray
var recordCount = getRecordCount(files)
assertResult(110){recordCount}
val partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH
val result = spark.sql(
s"""call repair_deduplicate(table => '$tableName',
| duplicated_partition_path => '$partitionPath',
| repaired_output_path => '$repairedOutputPath',
| dedupe_type => 'update_type')""".stripMargin).collect()
assertResult(1) {
result.length
}
// after deduplicate, there are 100 records
val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath))
files = fileStatus.map((status: FileStatus) => status.getPath.toString)
recordCount = getRecordCount(files)
assertResult(100){recordCount}
}
}
test("Test Call repair_deduplicate Procedure with upsert") {
withTempDir { tmp =>
val tableName = generateTableName
val bashPath = tmp.getCanonicalPath
val tablePath = s"$bashPath/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| name string,
| favorite_number int,
| favorite_color string
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'name',
| type = 'cow'
| )
""".stripMargin)
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
generateRecords(tablePath, bashPath, metaClient)
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// get fs and check number of latest files
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpserts)))
val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
// there should be 3 files
assertResult(3) {
filteredStatuses.size
}
// before deduplicate, all files contain 120 records
var files = filteredStatuses.toArray
var recordCount = getRecordCount(files)
assertResult(120){recordCount}
val partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH
val result = spark.sql(
s"""call repair_deduplicate(table => '$tableName',
| duplicated_partition_path => '$partitionPath',
| repaired_output_path => '$repairedOutputPath',
| dedupe_type => 'upsert_type')""".stripMargin).collect()
assertResult(1) {
result.length
}
// after deduplicate, there are 100 records
val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath))
files = fileStatus.map((status: FileStatus) => status.getPath.toString)
recordCount = getRecordCount(files)
assertResult(100){recordCount}
}
}
test("Test Call repair_deduplicate Procedure with real") {
withTempDir { tmp =>
val tableName = generateTableName
val bashPath = tmp.getCanonicalPath
val tablePath = s"$bashPath/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| name string,
| favorite_number int,
| favorite_color string
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'name',
| type = 'cow'
| )
""".stripMargin)
var metaClient = HoodieTableMetaClient.builder
.setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration())
.setBasePath(tablePath)
.build
generateRecords(tablePath, bashPath, metaClient)
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient)
// get fs and check number of latest files
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
metaClient.getFs.listStatus(new Path(duplicatedPartitionPath)))
val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
// there should be 3 files
assertResult(3) {
filteredStatuses.size
}
// before deduplicate, all files contain 210 records
var files = filteredStatuses.toArray
var recordCount = getRecordCount(files)
assertResult(210){recordCount}
val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH
val result = spark.sql(
s"""call repair_deduplicate(table => '$tableName',
| duplicated_partition_path => '$partitionPath',
| repaired_output_path => '$repairedOutputPath',
| dry_run => false)""".stripMargin).collect()
assertResult(1) {
result.length
}
// after deduplicate, there are 200 records
val fileStatus = metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))
files = fileStatus.map((status: FileStatus) => status.getPath.toString).filter(p => p.endsWith(".parquet"))
recordCount = getRecordCount(files)
assertResult(200){recordCount}
}
}
test("Test Call repair_migrate_partition_meta Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| partitioned by (ts)
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
// default is dry run
var result = spark.sql(s"""call repair_migrate_partition_meta(table => '$tableName')""").collect()
assertResult(2) {
result.length
}
// real run
result = spark.sql(s"""call repair_migrate_partition_meta(table => '$tableName', dry_run => false)""").collect()
assertResult(2) {
result.length
}
}
}
private def generateRecords(tablePath: String, bashpath: String, metaClient: HoodieTableMetaClient): Unit ={
duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString
duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString
duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString
repairedOutputPath = Paths.get(bashpath, "tmp").toString
// generate 200 records
val schema: Schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema)
val testTable: HoodieSparkWriteableTestTable = HoodieSparkWriteableTestTable.of(metaClient, schema)
val hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema)
val hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema)
testTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)
testTable.getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1)
// read records and get 10 to generate duplicates
val dupRecords = hoodieRecords1.subList(0, 10)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords)
testTable.addCommit("20160401010202")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords)
fileFormat = metaClient.getTableConfig.getBaseFileFormat
}
private def getRecordCount(files: Array[String]): Long = {
var recordCount: Long = 0
for (file <- files){
if (HoodieFileFormat.PARQUET == fileFormat){
recordCount += spark.sqlContext.read.parquet(file).count()
} else if (HoodieFileFormat.ORC == fileFormat) {
recordCount += spark.sqlContext.read.orc(file).count()
} else {
throw new UnsupportedOperationException(fileFormat.name + " format not supported yet.")
}
}
recordCount
}
@throws[IOException]
def createEmptyCleanRequestedFile(basePath: String, instantTime: String, configuration: Configuration): Unit = {
val commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeRequestedCleanerFileName(instantTime))
val fs = FSUtils.getFs(basePath, configuration)
val os = fs.create(commitFilePath, true)
os.close()
}
}