diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala new file mode 100644 index 000000000..93cec470e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +object DeDupeType extends Enumeration { + + type dedupeType = Value + + val INSERT_TYPE = Value("insert_type") + val UPDATE_TYPE = Value("update_type") + val UPSERT_TYPE = Value("upsert_type") +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala new file mode 100644 index 000000000..b6f610e7d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.exception.HoodieException +import org.apache.log4j.Logger +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +import java.util.stream.Collectors +import scala.collection.JavaConversions._ +import scala.collection.mutable.{Buffer, HashMap, HashSet, ListBuffer} + +/** + * Spark job to de-duplicate data present in a partition path + */ +class DedupeSparkJob(basePath: String, + duplicatedPartitionPath: String, + repairOutputPath: String, + sqlContext: SQLContext, + fs: FileSystem, + dedupeType: DeDupeType.Value) { + + val sparkHelper = new SparkHelper(sqlContext, fs) + val LOG = Logger.getLogger(this.getClass) + + /** + * + * @param tblName + * @return + */ + def getDupeKeyDF(tblName: String): DataFrame = { + val dupeSql = + s""" + select `${HoodieRecord.RECORD_KEY_METADATA_FIELD}` as dupe_key, + count(*) as dupe_cnt + from ${tblName} + group by `${HoodieRecord.RECORD_KEY_METADATA_FIELD}` + having dupe_cnt > 1 + """ + sqlContext.sql(dupeSql) + } + + /** + * + * Check a given partition for duplicates and suggest the deletions that need to be done in each file, + * in order to set things right. + * + * @return + */ + private def planDuplicateFix(): HashMap[String, HashSet[String]] = { + val tmpTableName = s"htbl_${System.currentTimeMillis()}" + val dedupeTblName = s"${tmpTableName}_dupeKeys" + + val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() + + val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) + val filteredStatuses = latestFiles.map(f => f.getPath) + LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}") + + val df = sqlContext.parquetFile(filteredStatuses: _*) + df.registerTempTable(tmpTableName) + val dupeKeyDF = getDupeKeyDF(tmpTableName) + dupeKeyDF.registerTempTable(dedupeTblName) + + // Obtain necessary satellite information for duplicate rows + val dupeDataSql = + s""" + SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time` + FROM $tmpTableName h + JOIN $dedupeTblName d + ON h.`_hoodie_record_key` = d.dupe_key + """ + val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0)) + getDedupePlan(dupeMap) + } + + private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): HashMap[String, HashSet[String]] = { + val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]() + dupeMap.foreach(rt => { + val (key, rows) = rt + + dedupeType match { + case DeDupeType.UPDATE_TYPE => + /* + This corresponds to the case where all duplicates have been updated at least once. + Once updated, duplicates are bound to have same commit time unless forcefully modified. + */ + rows.init.foreach(r => { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + }) + + case DeDupeType.INSERT_TYPE => + /* + This corresponds to the case where duplicates got created due to INSERT and have never been updated. + */ + var maxCommit = -1L + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c > maxCommit) + maxCommit = c + }) + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c != maxCommit) { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + } + }) + + case DeDupeType.UPSERT_TYPE => + /* + This corresponds to the case where duplicates got created as a result of inserts as well as updates, + i.e few duplicate records have been updated, while others were never updated. + */ + var maxCommit = -1L + + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c > maxCommit) + maxCommit = c + }) + val rowsWithMaxCommit = new ListBuffer[Row]() + rows.foreach(r => { + val c = r(3).asInstanceOf[String].toLong + if (c != maxCommit) { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + } else { + rowsWithMaxCommit += r + } + }) + + rowsWithMaxCommit.toList.init.foreach(r => { + val f = r(2).asInstanceOf[String].split("_")(0) + if (!fileToDeleteKeyMap.contains(f)) { + fileToDeleteKeyMap(f) = HashSet[String]() + } + fileToDeleteKeyMap(f).add(key) + }) + + case _ => throw new IllegalArgumentException("Please provide valid type for deduping!") + } + }) + LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: $fileToDeleteKeyMap") + fileToDeleteKeyMap + } + + def fixDuplicates(dryRun: Boolean = true) = { + val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build() + + val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) + val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) + + val latestFiles: java.util.List[HoodieBaseFile] = fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]()) + + val fileNameToPathMap = latestFiles.map(f => (f.getFileId, new Path(f.getPath))).toMap + val dupeFixPlan = planDuplicateFix() + + // 1. Copy all latest files into the temp fix path + fileNameToPathMap.foreach { case (fileName, filePath) => + val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else "" + val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix") + LOG.info(s"Copying from $filePath to $dstPath") + FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf) + } + + // 2. Remove duplicates from the bad files + dupeFixPlan.foreach { case (fileName, keysToSkip) => + val instantTime = FSUtils.getCommitTime(fileNameToPathMap(fileName).getName) + val badFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}.bad") + val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}") + LOG.info(" Skipping and writing new file for : " + fileName) + SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName)) + fs.delete(badFilePath, true) + } + + // 3. Check that there are no duplicates anymore. + val df = sqlContext.read.parquet(s"$repairOutputPath/*.parquet") + df.registerTempTable("fixedTbl") + val dupeKeyDF = getDupeKeyDF("fixedTbl") + val dupeCnt = dupeKeyDF.count() + if (dupeCnt != 0) { + dupeKeyDF.show() + throw new HoodieException("Still found some duplicates!!.. Inspect output") + } + + // 4. Additionally ensure no record keys are left behind. + val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => t._2.toString).toList) + val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"$repairOutputPath/${t._2.getName}").toList) + val missedRecordKeysDF = sourceDF.except(fixedDF) + val missedCnt = missedRecordKeysDF.count() + if (missedCnt != 0) { + missedRecordKeysDF.show() + throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!") + } + + println("No duplicates found & counts are in check!!!! ") + // 5. Prepare to copy the fixed files back. + fileNameToPathMap.foreach { case (_, filePath) => + val srcPath = new Path(s"$repairOutputPath/${filePath.getName}") + val dstPath = new Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}") + if (dryRun) { + LOG.info(s"[JUST KIDDING!!!] Copying from $srcPath to $dstPath") + } else { + // for real + LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath") + FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf) + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala new file mode 100644 index 000000000..1ed0e5e1a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.IndexedRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.avro.HoodieAvroWriteSupport +import org.apache.hudi.client.SparkTaskContextSupplier +import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} +import org.apache.hudi.common.util.BaseFileUtils +import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} +import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} +import org.apache.parquet.avro.AvroSchemaConverter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.spark.sql.{DataFrame, SQLContext} + +import scala.collection.JavaConversions._ +import scala.collection.mutable._ + +object SparkHelpers { + @throws[Exception] + def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { + val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile) + val schema: Schema = sourceRecords.get(0).getSchema + val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, + HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue); + val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter)) + val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) + + // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. + parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) + + val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) + for (rec <- sourceRecords) { + val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString + if (!keysToSkip.contains(key)) { + + writer.writeAvro(key, rec) + } + } + writer.close + } +} + +/** + * Bunch of Spark Shell/Scala stuff useful for debugging + */ +class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { + + /** + * Print keys from a file + * + * @param file + */ + def printKeysFromFile(file: String) = { + getRowKeyDF(file).collect().foreach(println(_)) + } + + /** + * + * @param file + * @return + */ + def getRowKeyDF(file: String): DataFrame = { + sqlContext.read.parquet(file).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") + } + + /** + * Does the rowKey actually exist in the file. + * + * @param rowKey + * @param file + * @return + */ + def isFileContainsKey(rowKey: String, file: String): Boolean = { + println(s"Checking $file for key $rowKey") + val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '$rowKey'") + if (ff.count() > 0) true else false + } + + /** + * Number of keys in a given file + * + * @param file + * @param sqlContext + */ + def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) = { + val keyCount = getRowKeyDF(file).collect().length + println(keyCount) + keyCount + } + + /** + * + * Checks that all the keys in the file, have been added to the bloom filter + * in the footer + * + * @param conf + * @param sqlContext + * @param file + * @return + */ + def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = { + val bf = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new Path(file)) + val foundCount = sqlContext.parquetFile(file) + .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") + .collect().count(r => !bf.mightContain(r.getString(0))) + val totalCount = getKeyCount(file, sqlContext) + println(s"totalCount: $totalCount, foundCount: $foundCount") + totalCount == foundCount + } + + def getDistinctKeyDF(paths: List[String]): DataFrame = { + sqlContext.read.parquet(paths: _*).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`").distinct() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 5f2728597..1eb82d97c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -75,6 +75,11 @@ object HoodieProcedures { mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder) mapBuilder.put(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder) mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder) + mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, RepairAddpartitionmetaProcedure.builder) + mapBuilder.put(RepairCorruptedCleanFilesProcedure.NAME, RepairCorruptedCleanFilesProcedure.builder) + mapBuilder.put(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder) + mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder) + mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala new file mode 100644 index 000000000..bb65174c4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodiePartitionMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("metadata_is_present", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val dryRun = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + val tablePath = getBasePath(tableName) + + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build + + val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val partitionPaths: util.List[String] = FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getFs, tablePath); + val basePath: Path = new Path(tablePath) + + val rows = new util.ArrayList[Row](partitionPaths.size) + for (partition <- partitionPaths) { + val partitionPath: Path = FSUtils.getPartitionPath(basePath, partition) + var isPresent = "Yes" + var action = "None" + if (!HoodiePartitionMetadata.hasPartitionMetadata(metaClient.getFs, partitionPath)) { + isPresent = "No" + if (!dryRun) { + val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs, latestCommit, basePath, partitionPath, metaClient.getTableConfig.getPartitionMetafileFormat) + partitionMetadata.trySave(0) + action = "Repaired" + } + } + rows.add(Row(partition, isPresent, action)) + } + + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new RepairAddpartitionmetaProcedure() +} + +object RepairAddpartitionmetaProcedure { + val NAME = "repair_add_partition_meta" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RepairAddpartitionmetaProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala new file mode 100644 index 000000000..ff185d1bd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.avro.AvroRuntimeException +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} +import org.apache.hudi.common.util.CleanerUtils +import org.apache.hudi.exception.HoodieIOException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.io.IOException +import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter + +class RepairCorruptedCleanFilesProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getBasePath(tableName) + + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build + + val cleanerTimeline = metaClient.getActiveTimeline.getCleanerTimeline + logInfo("Inspecting pending clean metadata in timeline for corrupted files") + var result = true + cleanerTimeline.filterInflightsAndRequested.getInstants.iterator().asScala.foreach((instant: HoodieInstant) => { + try { + CleanerUtils.getCleanerPlan(metaClient, instant) + } catch { + case e: AvroRuntimeException => + logWarning("Corruption found. Trying to remove corrupted clean instant file: " + instant) + HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant) + case ioe: IOException => + if (ioe.getMessage.contains("Not an Avro data file")) { + logWarning("Corruption found. Trying to remove corrupted clean instant file: " + instant) + HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, metaClient.getMetaPath, instant) + } else { + result = false + throw new HoodieIOException(ioe.getMessage, ioe) + } + } + }) + Seq(Row(result)) + } + + override def build: Procedure = new RepairCorruptedCleanFilesProcedure() +} + +object RepairCorruptedCleanFilesProcedure { + val NAME = "repair_corrupted_clean_files" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RepairCorruptedCleanFilesProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala new file mode 100644 index 000000000..8ee5055e1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.exception.HoodieException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import java.util.function.Supplier + +import org.apache.spark.sql.hudi.{DeDupeType, DedupeSparkJob} + +import scala.util.{Failure, Success, Try} + +class RepairDeduplicateProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "duplicated_partition_path", DataTypes.StringType, None), + ProcedureParameter.required(2, "repaired_output_path", DataTypes.StringType, None), + ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, true), + ProcedureParameter.optional(4, "dedupe_type", DataTypes.StringType, "insert_type") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val duplicatedPartitionPath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val repairedOutputPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val dryRun = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean] + val dedupeType = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String] + + if (!DeDupeType.values.contains(DeDupeType.withName(dedupeType))) { + throw new IllegalArgumentException("Please provide valid dedupe type!") + } + val basePath = getBasePath(tableName) + + Try { + val job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, spark.sqlContext, + FSUtils.getFs(basePath, jsc.hadoopConfiguration), DeDupeType.withName(dedupeType)) + job.fixDuplicates(dryRun) + } match { + case Success(_) => + if (dryRun){ + Seq(Row(s"Deduplicated files placed in: $repairedOutputPath.")) + } else { + Seq(Row(s"Deduplicated files placed in: $duplicatedPartitionPath.")) + } + case Failure(e) => + throw new HoodieException(s"Deduplication failed!", e) + } + } + override def build: Procedure = new RepairDeduplicateProcedure() +} + +object RepairDeduplicateProcedure { + val NAME = "repair_deduplicate" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RepairDeduplicateProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala new file mode 100644 index 000000000..7daacb2f1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodiePartitionMetadata +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.util.Option +import org.apache.hudi.exception.HoodieIOException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.io.IOException +import java.util +import java.util.Properties +import java.util.function.{Consumer, Supplier} +import scala.collection.JavaConversions._ + +class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("text_metafile_present", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("base_metafile_present", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val dryRun = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + val tablePath = getBasePath(tableName) + + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build + + val engineContext: HoodieLocalEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf) + val partitionPaths: util.List[String] = FSUtils.getAllPartitionPaths(engineContext, tablePath, false, false) + val basePath: Path = new Path(tablePath) + + val rows = new util.ArrayList[Row](partitionPaths.size) + for (partitionPath <- partitionPaths) { + val partition: Path = FSUtils.getPartitionPath(tablePath, partitionPath) + val textFormatFile: Option[Path] = HoodiePartitionMetadata.textFormatMetaPathIfExists(metaClient.getFs, partition) + val baseFormatFile: Option[Path] = HoodiePartitionMetadata.baseFormatMetaPathIfExists(metaClient.getFs, partition) + val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE" + if (!dryRun) { + if (!baseFormatFile.isPresent) { + val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs, latestCommit, + basePath, partition, Option.of(metaClient.getTableConfig.getBaseFileFormat)) + partitionMetadata.trySave(0) + } + // delete it, in case we failed midway last time. + textFormatFile.ifPresent( + new Consumer[Path] { + override def accept(p: Path): Unit = { + try metaClient.getFs.delete(p, false) + catch { + case e: IOException => + throw new HoodieIOException(e.getMessage, e) + } + } + }) + action = "MIGRATED" + } + rows.add(Row(partitionPath, String.valueOf(textFormatFile.isPresent), + String.valueOf(baseFormatFile.isPresent), action)) + } + val props: Properties = new Properties + props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, "true") + HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), props) + + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new RepairMigratePartitionMetaProcedure() +} + +object RepairMigratePartitionMetaProcedure { + val NAME = "repair_migrate_partition_meta" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RepairMigratePartitionMetaProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala new file mode 100644 index 000000000..043217cf2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.io.FileInputStream +import java.util +import java.util.Properties +import java.util.function.Supplier +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters.asScalaIteratorConverter + +class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "new_props_file_path", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("property", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("old_value", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("new_value", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val overwriteFilePath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val tablePath = getBasePath(tableName) + + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build + + var newProps = new Properties + newProps.load(new FileInputStream(overwriteFilePath)) + val oldProps = metaClient.getTableConfig.propsMap + val metaPathDir = new Path(tablePath, METAFOLDER_NAME) + HoodieTableConfig.create(metaClient.getFs, metaPathDir, newProps) + // reload new props as checksum would have been added + newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps + + val allPropKeys = new util.TreeSet[String] + allPropKeys.addAll(newProps.keySet.stream.iterator().asScala.map(key => key.toString).toList) + allPropKeys.addAll(oldProps.keySet) + + val rows = new util.ArrayList[Row](allPropKeys.size) + for (propKey <- allPropKeys) { + rows.add(Row(propKey, oldProps.getOrDefault(propKey, "null"), + newProps.getOrDefault(propKey, "null").toString)) + } + + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new RepairOverwriteHoodiePropsProcedure() +} + +object RepairOverwriteHoodiePropsProcedure { + val NAME = "repair_overwrite_hoodie_props" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RepairOverwriteHoodiePropsProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties b/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties new file mode 100644 index 000000000..d74c0444a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties @@ -0,0 +1,21 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +hoodie.table.name=test_table +hoodie.table.type=COPY_ON_WRITE +hoodie.archivelog.folder=archive +hoodie.timeline.layout.version=1 diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala new file mode 100644 index 000000000..587f7a4bd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil} +import org.apache.hudi.testutils.HoodieSparkWriteableTestTable +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +import java.io.IOException +import java.net.URL +import java.nio.file.{Files, Paths} +import scala.collection.JavaConverters.asScalaIteratorConverter + +class TestRepairsProcedure extends HoodieSparkSqlTestBase { + + test("Test Call repair_add_partition_meta Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // create commit instant + Files.createFile(Paths.get(tablePath, ".hoodie", "100.commit")) + + val metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + // create partition path + val partition1 = Paths.get(tablePath, "2016/03/15").toString + val partition2 = Paths.get(tablePath, "2015/03/16").toString + val partition3 = Paths.get(tablePath, "2015/03/17").toString + assertResult(metaClient.getFs.mkdirs(new Path(partition1))) {true} + assertResult(metaClient.getFs.mkdirs(new Path(partition2))) {true} + assertResult(metaClient.getFs.mkdirs(new Path(partition3))) {true} + + // default is dry run + val dryResult = spark.sql(s"""call repair_add_partition_meta(table => '$tableName')""").collect() + assertResult(3) { + dryResult.length + } + + // real run + val realRunResult = spark.sql(s"""call repair_add_partition_meta(table => '$tableName', dry_run => false)""").collect() + assertResult(3) { + realRunResult.length + } + } + } + + test("Test Call repair_overwrite_hoodie_props Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // create commit instant + val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties") + + // overwrite hoodie props + val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect() + assertResult(15) { + Result.length + } + } + } + + test("Test Call repair_corrupted_clean_files Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + // Create four requested files + for (i <- 100 until 104) { + val timestamp = String.valueOf(i) + // Write corrupted requested Clean File + createEmptyCleanRequestedFile(tablePath, timestamp, metaClient.getHadoopConf) + } + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + // first, there are four instants + assertResult(4) { + metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count + } + + checkAnswer(s"""call repair_corrupted_clean_files(table => '$tableName')""")(Seq(true)) + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + // after clearing, there should be 0 instant + assertResult(0) { + metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count + } + } + } + + private var duplicatedPartitionPath: String = null + private var duplicatedPartitionPathWithUpdates: String = null + private var duplicatedPartitionPathWithUpserts: String = null + private var repairedOutputPath: String = null + private var fileFormat: HoodieFileFormat = null + + test("Test Call repair_deduplicate Procedure with insert") { + withTempDir { tmp => + val tableName = generateTableName + val bashPath = tmp.getCanonicalPath + val tablePath = s"$bashPath/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | name string, + | favorite_number int, + | favorite_color string + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'name', + | type = 'cow' + | ) + """.stripMargin) + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + generateRecords(tablePath, bashPath, metaClient) + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + + // get fs and check number of latest files + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))) + val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList + // there should be 3 files + assertResult(3) { + filteredStatuses.size + } + + // before deduplicate, all files contain 210 records + var files = filteredStatuses.toArray + var recordCount = getRecordCount(files) + assertResult(210){recordCount} + + val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + val result = spark.sql( + s"""call repair_deduplicate(table => '$tableName', + | duplicated_partition_path => '$partitionPath', + | repaired_output_path => '$repairedOutputPath')""".stripMargin).collect() + assertResult(1) { + result.length + } + + // after deduplicate, there are 200 records + val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath)) + files = fileStatus.map((status: FileStatus) => status.getPath.toString) + recordCount = getRecordCount(files) + assertResult(200){recordCount} + } + } + + test("Test Call repair_deduplicate Procedure with update") { + withTempDir { tmp => + val tableName = generateTableName + val bashPath = tmp.getCanonicalPath + val tablePath = s"$bashPath/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | name string, + | favorite_number int, + | favorite_color string + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'name', + | type = 'cow' + | ) + """.stripMargin) + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + generateRecords(tablePath, bashPath, metaClient) + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + + // get fs and check number of latest files + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpdates))) + val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList + // there should be 2 files + assertResult(2) { + filteredStatuses.size + } + + // before deduplicate, all files contain 110 records + var files = filteredStatuses.toArray + var recordCount = getRecordCount(files) + assertResult(110){recordCount} + + val partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH + val result = spark.sql( + s"""call repair_deduplicate(table => '$tableName', + | duplicated_partition_path => '$partitionPath', + | repaired_output_path => '$repairedOutputPath', + | dedupe_type => 'update_type')""".stripMargin).collect() + assertResult(1) { + result.length + } + + // after deduplicate, there are 100 records + val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath)) + files = fileStatus.map((status: FileStatus) => status.getPath.toString) + recordCount = getRecordCount(files) + assertResult(100){recordCount} + } + } + + test("Test Call repair_deduplicate Procedure with upsert") { + withTempDir { tmp => + val tableName = generateTableName + val bashPath = tmp.getCanonicalPath + val tablePath = s"$bashPath/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | name string, + | favorite_number int, + | favorite_color string + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'name', + | type = 'cow' + | ) + """.stripMargin) + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + generateRecords(tablePath, bashPath, metaClient) + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + + // get fs and check number of latest files + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + metaClient.getFs.listStatus(new Path(duplicatedPartitionPathWithUpserts))) + val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList + // there should be 3 files + assertResult(3) { + filteredStatuses.size + } + + // before deduplicate, all files contain 120 records + var files = filteredStatuses.toArray + var recordCount = getRecordCount(files) + assertResult(120){recordCount} + + val partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH + val result = spark.sql( + s"""call repair_deduplicate(table => '$tableName', + | duplicated_partition_path => '$partitionPath', + | repaired_output_path => '$repairedOutputPath', + | dedupe_type => 'upsert_type')""".stripMargin).collect() + assertResult(1) { + result.length + } + + // after deduplicate, there are 100 records + val fileStatus = metaClient.getFs.listStatus(new Path(repairedOutputPath)) + files = fileStatus.map((status: FileStatus) => status.getPath.toString) + recordCount = getRecordCount(files) + assertResult(100){recordCount} + } + } + + test("Test Call repair_deduplicate Procedure with real") { + withTempDir { tmp => + val tableName = generateTableName + val bashPath = tmp.getCanonicalPath + val tablePath = s"$bashPath/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | name string, + | favorite_number int, + | favorite_color string + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'name', + | type = 'cow' + | ) + """.stripMargin) + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + generateRecords(tablePath, bashPath, metaClient) + + // reload meta client + metaClient = HoodieTableMetaClient.reload(metaClient) + + // get fs and check number of latest files + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + metaClient.getFs.listStatus(new Path(duplicatedPartitionPath))) + val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList + // there should be 3 files + assertResult(3) { + filteredStatuses.size + } + + // before deduplicate, all files contain 210 records + var files = filteredStatuses.toArray + var recordCount = getRecordCount(files) + assertResult(210){recordCount} + + val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + val result = spark.sql( + s"""call repair_deduplicate(table => '$tableName', + | duplicated_partition_path => '$partitionPath', + | repaired_output_path => '$repairedOutputPath', + | dry_run => false)""".stripMargin).collect() + assertResult(1) { + result.length + } + + // after deduplicate, there are 200 records + val fileStatus = metaClient.getFs.listStatus(new Path(duplicatedPartitionPath)) + files = fileStatus.map((status: FileStatus) => status.getPath.toString).filter(p => p.endsWith(".parquet")) + recordCount = getRecordCount(files) + assertResult(200){recordCount} + } + } + + test("Test Call repair_migrate_partition_meta Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // default is dry run + var result = spark.sql(s"""call repair_migrate_partition_meta(table => '$tableName')""").collect() + assertResult(2) { + result.length + } + + // real run + result = spark.sql(s"""call repair_migrate_partition_meta(table => '$tableName', dry_run => false)""").collect() + assertResult(2) { + result.length + } + } + } + + private def generateRecords(tablePath: String, bashpath: String, metaClient: HoodieTableMetaClient): Unit ={ + duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString + duplicatedPartitionPathWithUpdates = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString + duplicatedPartitionPathWithUpserts = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString + repairedOutputPath = Paths.get(bashpath, "tmp").toString + + // generate 200 records + val schema: Schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema) + val testTable: HoodieSparkWriteableTestTable = HoodieSparkWriteableTestTable.of(metaClient, schema) + + val hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema) + val hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema) + testTable.addCommit("20160401010101") + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1) + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) + testTable.getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "4", hoodieRecords1) + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "6", hoodieRecords1) + + // read records and get 10 to generate duplicates + val dupRecords = hoodieRecords1.subList(0, 10) + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "5", dupRecords) + testTable.addCommit("20160401010202") + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords) + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) + testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords) + + fileFormat = metaClient.getTableConfig.getBaseFileFormat + } + + private def getRecordCount(files: Array[String]): Long = { + var recordCount: Long = 0 + for (file <- files){ + if (HoodieFileFormat.PARQUET == fileFormat){ + recordCount += spark.sqlContext.read.parquet(file).count() + } else if (HoodieFileFormat.ORC == fileFormat) { + recordCount += spark.sqlContext.read.orc(file).count() + } else { + throw new UnsupportedOperationException(fileFormat.name + " format not supported yet.") + } + } + recordCount + } + + @throws[IOException] + def createEmptyCleanRequestedFile(basePath: String, instantTime: String, configuration: Configuration): Unit = { + val commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeRequestedCleanerFileName(instantTime)) + val fs = FSUtils.getFs(basePath, configuration) + val os = fs.create(commitFilePath, true) + os.close() + } +}