1
0

Import from Hoodie private repo: Part 1

This commit is contained in:
Prasanna Rajaperumal
2016-12-16 14:03:59 -08:00
commit 0512da094b
56 changed files with 8868 additions and 0 deletions

View File

@@ -0,0 +1,179 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli
import com.uber.hoodie.common.model.{HoodieRecord, HoodieTableMetadata}
import com.uber.hoodie.common.util.FSUtils
import com.uber.hoodie.exception.HoodieException
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable._
/**
* Spark job to de-duplicate data present in a partition path
*/
class DedupeSparkJob (basePath: String,
duplicatedPartitionPath: String,
repairOutputPath: String,
sqlContext: SQLContext,
fs: FileSystem) {
val sparkHelper = new SparkHelper(sqlContext, fs)
val LOG = Logger.getLogger(this.getClass)
/**
*
* @param tblName
* @return
*/
def getDupeKeyDF(tblName: String) : DataFrame = {
val dupeSql = s"""
select `${HoodieRecord.RECORD_KEY_METADATA_FIELD}` as dupe_key,
count(*) as dupe_cnt
from ${tblName}
group by `${HoodieRecord.RECORD_KEY_METADATA_FIELD}`
having dupe_cnt > 1
"""
return sqlContext.sql(dupeSql)
}
/**
*
* Check a given partition for duplicates and suggest the deletions that need to be done in each file,
* in order to set things right.
*
* @return
*/
private def planDuplicateFix() : HashMap[String, HashSet[String]] = {
val tmpTableName = s"htbl_${System.currentTimeMillis()}"
val dedupeTblName = s"${tmpTableName}_dupeKeys"
val metadata = new HoodieTableMetadata(fs, basePath)
val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}"))
val filteredStatuses = metadata.getLatestVersions(allFiles).map(f => f.getPath.toString);
LOG.info(s" List of files under partition: ${} => ${filteredStatuses.mkString(" ")}")
val df = sqlContext.parquetFile(filteredStatuses:_*)
df.registerTempTable(tmpTableName)
val dupeKeyDF = getDupeKeyDF(tmpTableName)
dupeKeyDF.registerTempTable(dedupeTblName)
// Obtain necessary satellite information for duplicate rows
val dupeDataSql = s"""
SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time`
FROM ${tmpTableName} h
JOIN ${dedupeTblName} d
ON h.`_hoodie_record_key` = d.dupe_key
"""
val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0))
val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
// Mark all files except the one with latest commits for deletion
dupeMap.foreach(rt => {
val key = rt._1
val rows = rt._2
var maxCommit = -1L
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c > maxCommit)
maxCommit = c
})
rows.foreach(r => {
val c = r(3).asInstanceOf[String].toLong
if (c != maxCommit){
val f = r(2).asInstanceOf[String].split("_")(0)
if (!fileToDeleteKeyMap.contains(f)){
fileToDeleteKeyMap(f) = HashSet[String]()
}
fileToDeleteKeyMap(f).add(key)
}
})
})
return fileToDeleteKeyMap
}
def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetadata(fs, basePath)
val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}"))
val fileNameToPathMap = metadata.getLatestVersions(allFiles).map(f => (FSUtils.getFileId(f.getPath.getName), f.getPath)).toMap;
val dupeFixPlan = planDuplicateFix()
// 1. Copy all latest files into the temp fix path
fileNameToPathMap.foreach{ case(fileName, filePath) => {
val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
val dstPath = new Path(s"${repairOutputPath}/${filePath.getName}${badSuffix}")
LOG.info(s"Copying from ${filePath} to ${dstPath}")
FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf)
}}
// 2. Remove duplicates from the bad files
dupeFixPlan.foreach{case(fileName, keysToSkip) => {
val commitTime = FSUtils.getCommitTime(fileNameToPathMap(fileName).getName)
val badFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}.bad")
val newFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}")
LOG.info(" Skipping and writing new file for : " + fileName)
SparkHelpers.skipKeysAndWriteNewFile(commitTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName))
fs.delete(badFilePath, false)
}}
// 3. Check that there are no duplicates anymore.
val df = sqlContext.read.parquet(s"${repairOutputPath}/*.parquet")
df.registerTempTable("fixedTbl")
val dupeKeyDF = getDupeKeyDF("fixedTbl")
val dupeCnt = dupeKeyDF.count();
if (dupeCnt != 0) {
dupeKeyDF.show()
throw new HoodieException("Still found some duplicates!!.. Inspect output")
}
// 4. Additionally ensure no record keys are left behind.
val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => t._2.toString).toList)
val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"${repairOutputPath}/${t._2.getName}").toList)
val missedRecordKeysDF = sourceDF.except(fixedDF)
val missedCnt = missedRecordKeysDF.count()
if (missedCnt != 0) {
missedRecordKeysDF.show()
throw new HoodieException("Some records in source are not found in fixed files. Inspect output!!")
}
println("No duplicates found & counts are in check!!!! ")
// 4. Prepare to copy the fixed files back.
fileNameToPathMap.foreach { case (fileName, filePath) => {
val srcPath = new Path(s"${repairOutputPath}/${filePath.getName}")
val dstPath = new Path(s"${basePath}/${duplicatedPartitionPath}/${filePath.getName}")
if (dryRun) {
LOG.info(s"[JUST KIDDING!!!] Copying from ${srcPath} to ${dstPath}")
} else {
// for real
LOG.info(s"[FOR REAL!!!] Copying from ${srcPath} to ${dstPath}")
FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf)
}
}}
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli
import com.uber.hoodie.avro.HoodieAvroWriteSupport
import com.uber.hoodie.common.BloomFilter
import com.uber.hoodie.common.model.HoodieRecord
import com.uber.hoodie.common.util.ParquetUtils
import com.uber.hoodie.config.{HoodieIndexConfig, HoodieStorageConfig}
import com.uber.hoodie.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
import com.uber.hoodie.stream.GenericHoodiePayload
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable._
object SparkHelpers {
@throws[Exception]
def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = ParquetUtils.readAvroRecords(sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf)
val writer = new HoodieParquetWriter[GenericHoodiePayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
writer.writeAvro(key, rec)
}
}
writer.close
}
def getBloomFilter(file: String, conf: Configuration): String = {
val footer = ParquetFileReader.readFooter(conf, new Path(file));
return footer.getFileMetaData().getKeyValueMetaData().get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)
}
}
/**
* Bunch of Spark Shell/Scala stuff useful for debugging
*/
class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
/**
* Print keys from a file
*
* @param file
*/
def printKeysFromFile(file: String) = {
getRowKeyDF(file).collect().foreach(println(_))
}
/**
*
* @param file
* @return
*/
def getRowKeyDF(file: String): DataFrame = {
sqlContext.read.parquet(file).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
}
/**
* Does the rowKey actually exist in the file.
*
* @param rowKey
* @param file
* @return
*/
def isFileContainsKey(rowKey: String, file: String): Boolean = {
println(s"Checking ${file} for key ${rowKey}")
val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '${rowKey}'")
if (ff.count() > 0)
return true
else
return false
}
/**
* Number of keys in a given file
*
* @param file
* @param sqlContext
*/
def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) ={
println(getRowKeyDF(file).collect().size)
}
/**
*
* Checks that all the keys in the file, have been added to the bloom filter
* in the footer
*
* @param conf
* @param sqlContext
* @param file
* @return
*/
def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String) : Boolean = {
val bfStr = SparkHelpers.getBloomFilter(file, conf)
val bf = new com.uber.hoodie.common.BloomFilter(bfStr)
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().
filter(r => !bf.mightContain(r.getString(0))).size
val totalCount = getKeyCount(file, sqlContext)
s"totalCount: ${totalCount}, foundCount: ${foundCount}"
totalCount == foundCount
}
def getDistinctKeyDF(paths: List[String]) : DataFrame = {
sqlContext.read.parquet(paths:_*).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`").distinct()
}
}