1
0

CodeStyle formatting to conform to basic Checkstyle rules.

The code-style rules follow google style with some changes:

1. Increase line length from 100 to 120
2. Disable JavaDoc related checkstyles as this needs more manual work.

Both source and test code are checked for code-style
This commit is contained in:
Balaji Varadarajan
2018-03-20 16:29:20 -07:00
committed by vinoth chandar
parent 987f5d6b96
commit 788e4f2d2e
200 changed files with 6209 additions and 5975 deletions

View File

@@ -75,23 +75,425 @@ import scala.Tuple2;
/**
* Implementation of a very heavily read-optimized Hoodie Table where
*
* <p>
* INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing
* file, to expand it
*
* <p>
* UPDATES - Produce a new version of the file, just replacing the updated records with new values
*/
public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieTable<T> {
private static Logger logger = LogManager.getLogger(HoodieCopyOnWriteTable.class);
public HoodieCopyOnWriteTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
super(config, metaClient);
}
private static Logger logger = LogManager.getLogger(HoodieCopyOnWriteTable.class);
private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String,
PartitionCleanStat> deleteFilesFunc(
HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>)
iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
String deletePathStr = partitionDelFileTuple._2();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
}
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePathStr);
partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult);
}
return partitionCleanStatMap.entrySet().stream()
.map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
};
}
private static PairFlatMapFunction<String, String, String> getFilesToDeleteFunc(HoodieTable table,
HoodieWriteConfig config) {
return (PairFlatMapFunction<String, String, String>) partitionPathToClean -> {
HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config);
return cleaner.getDeletePaths(partitionPathToClean).stream()
.map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())).iterator();
};
}
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr)
throws IOException {
Path deletePath = new Path(deletePathStr);
logger.debug("Working on delete path :" + deletePath);
boolean deleteResult = fs.delete(deletePath, false);
if (deleteResult) {
logger.debug("Cleaned file at path :" + deletePath);
}
return deleteResult;
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
return new UpsertPartitioner(profile);
}
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return getUpsertPartitioner(profile);
}
@Override
public boolean isWorkloadProfileNeeded() {
return true;
}
@Override
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String commitTime) {
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle,
String commitTime, String fileLoc) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc);
} else {
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath())
.withConf(getHadoopConf()).build();
try {
IndexedRecord record;
while ((record = reader.read()) != null) {
// Two types of writes here (new record, and old record).
// We have already catch the exception during writing new records.
// But for old records, we should fail if any exception happens.
upsertHandle.write((GenericRecord) record);
}
} catch (IOException e) {
throw new HoodieUpsertException(
"Failed to read record from " + upsertHandle.getOldFilePath() + " with new Schema "
+ upsertHandle.getSchema(), e);
} finally {
reader.close();
upsertHandle.close();
}
}
//TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.getWriteStatus());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus()))
.iterator();
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc);
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
return new LazyInsertIterable<>(recordItr, config, commitTime, this);
}
@SuppressWarnings("unchecked")
@Override
public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition,
Iterator recordItr, Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(commitTime, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(commitTime, binfo.fileLoc, recordItr);
} else {
throw new HoodieUpsertException(
"Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
logger.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Integer partition,
Iterator recordItr, Partitioner partitioner) {
return handleUpsertPartition(commitTime, partition, recordItr, partitioner);
}
/**
* Performs cleaning of partition paths according to cleaning policy and returns the number of
* files cleaned. Handles skews in partitions to clean by making files to clean as the unit of
* task distribution.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
@Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc) {
try {
FileSystem fs = getMetaClient().getFs();
List<String> partitionsToClean = FSUtils
.getAllPartitionPaths(fs, getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
.getCleanerPolicy());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here mom. It is already clean");
return Collections.emptyList();
}
return cleanPartitionPaths(partitionsToClean, jsc);
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a
* set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(String partitionPath, List<String> commits)
throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
FileStatus[] toBeDeleted = fs
.listStatus(new Path(config.getBasePath(), partitionPath), path -> {
if (!path.toString().contains(".parquet")) {
return false;
}
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commits.contains(fileCommitTime);
});
Map<FileStatus, Boolean> results = Maps.newHashMap();
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
throws IOException {
String actionType = this.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
List<String> inflights = this.getInflightCommitTimeline().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
// Atomically unpublish all the commits
commits.stream().filter(s -> !inflights.contains(s))
.map(s -> new HoodieInstant(false, actionType, s))
.forEach(activeTimeline::revertToInflight);
logger.info("Unpublished " + commits);
// delete all the data files for all these commits
logger.info("Clean out all parquet files generated for commits: " + commits);
List<HoodieRollbackStat> stats = jsc.parallelize(FSUtils
.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
// Scan all partitions files with this commit time
Map<FileStatus, Boolean> results = deleteCleanedFiles(partitionPath, commits);
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(results).build();
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
// Remove the rolled back inflight commits
commits.stream().map(s -> new HoodieInstant(true, actionType, s))
.forEach(activeTimeline::deleteInflight);
logger.info("Deleted inflight commits " + commits);
return stats;
}
/**
* Finalize the written data files
*
* @param writeStatuses List of WriteStatus
* @return number of files finalized
*/
@Override
@SuppressWarnings("unchecked")
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return Optional.empty();
}
// This is to rename each data file from temporary path to its final location
List<Tuple2<String, Boolean>> results = jsc
.parallelize(writeStatuses, config.getFinalizeWriteParallelism()).map(writeStatus -> {
Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>)
writeStatus;
HoodieWriteStat writeStat = writeStatTuple2._2();
final FileSystem fs = getMetaClient().getFs();
final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
if (writeStat.getTempPath() != null) {
final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
boolean success;
try {
logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
success = fs.rename(tempPath, finalPath);
} catch (IOException e) {
throw new HoodieIOException(
"Failed to rename file: " + tempPath + " to " + finalPath);
}
if (!success) {
throw new HoodieIOException(
"Failed to rename file: " + tempPath + " to " + finalPath);
}
}
return new Tuple2<>(writeStat.getPath(), true);
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
return Optional.of(results.size());
}
/**
* Clean temporary data files that are produced from previous failed commit or retried spark
* stages.
*/
private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return;
}
final FileSystem fs = getMetaClient().getFs();
final Path temporaryFolder = new Path(config.getBasePath(),
HoodieTableMetaClient.TEMPFOLDER_NAME);
try {
if (!fs.exists(temporaryFolder)) {
logger.info("Temporary folder does not exist: " + temporaryFolder);
return;
}
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
List<Tuple2<String, Boolean>> results = jsc
.parallelize(fileStatusesList, config.getFinalizeWriteParallelism()).map(fileStatus -> {
FileSystem fs1 = getMetaClient().getFs();
boolean success = fs1.delete(fileStatus.getPath(), false);
logger
.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
return new Tuple2<>(fileStatus.getPath().toString(), success);
}).collect();
for (Tuple2<String, Boolean> result : results) {
if (!result._2()) {
logger.info("Failed to delete file: " + result._1());
throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
}
}
} catch (IOException e) {
throw new HoodieIOException(
"Failed to clean data files in temporary folder: " + temporaryFolder);
}
}
private List<HoodieCleanStat> cleanPartitionPaths(List<String> partitionsToClean,
JavaSparkContext jsc) {
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(partitionsToClean, cleanerParallelism)
.flatMapToPair(getFilesToDeleteFunc(this, config))
.repartition(cleanerParallelism) // repartition to remove skews
.mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(
// merge partition level clean stats below
(Function2<PartitionCleanStat, PartitionCleanStat, PartitionCleanStat>) (e1, e2) -> e1
.merge(e2)).collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream()
.collect(Collectors.toMap(e -> e._1(), e -> e._2()));
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
// Return PartitionCleanStat for each partition passed.
return partitionsToClean.stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap
.get(partitionPath) : new PartitionCleanStat(partitionPath);
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy())
.withPartitionPath(partitionPath)
.withEarliestCommitRetained(cleaner.getEarliestCommitToRetain())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
}).collect(Collectors.toList());
}
enum BucketType {
UPDATE,
INSERT
UPDATE, INSERT
}
private static class PartitionCleanStat implements Serializable {
private final String partitionPath;
private final List<String> deletePathPatterns = new ArrayList<>();
private final List<String> successDeleteFiles = new ArrayList<>();
private final List<String> failedDeleteFiles = new ArrayList<>();
private PartitionCleanStat(String partitionPath) {
this.partitionPath = partitionPath;
}
private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
if (deletedFileResult) {
successDeleteFiles.add(deletePathStr);
} else {
failedDeleteFiles.add(deletePathStr);
}
}
private void addDeleteFilePatterns(String deletePathStr) {
deletePathPatterns.add(deletePathStr);
}
private PartitionCleanStat merge(PartitionCleanStat other) {
if (!this.partitionPath.equals(other.partitionPath)) {
throw new RuntimeException(String
.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath));
}
successDeleteFiles.addAll(other.successDeleteFiles);
deletePathPatterns.addAll(other.deletePathPatterns);
failedDeleteFiles.addAll(other.failedDeleteFiles);
return this;
}
}
/**
@@ -150,45 +552,37 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition)
*/
class UpsertPartitioner extends Partitioner {
/**
* List of all small files to be corrected
*/
List<SmallFile> smallFiles = new ArrayList<SmallFile>();
/**
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming
* workload into
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining total inserts, upserts etc.
*/
private WorkloadStat globalStat;
/**
* Helps decide which bucket an incoming update should go to.
*/
private HashMap<String, Integer> updateLocationToBucket;
/**
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
*/
private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
/**
* Remembers what type each bucket is for later.
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;
/**
* List of all small files to be corrected
*/
List<SmallFile> smallFiles = new ArrayList<SmallFile>();
UpsertPartitioner(WorkloadProfile profile) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
@@ -198,16 +592,17 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
assignUpdates(profile);
assignInserts(profile);
logger.info("Total Buckets :" + totalBuckets + ", " +
"buckets info => " + bucketInfoMap + ", \n" +
"Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" +
"UpdateLocations mapped to buckets =>" + updateLocationToBucket);
logger.info(
"Total Buckets :" + totalBuckets + ", " + "buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
}
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
WorkloadStat gStat = profile.getGlobalStat();
for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount()
.entrySet()) {
addUpdateBucket(updateLocEntry.getKey());
}
}
@@ -270,10 +665,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
int insertBuckets = (int) Math.max(totalUnassignedInserts / insertRecordsPerBucket, 1L);
logger
.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets
+ ", recordsPerBucket => " + insertRecordsPerBucket);
logger.info(
"After small file assignment: unassignedInserts => " + totalUnassignedInserts
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => "
+ insertRecordsPerBucket);
for (int b = 0; b < insertBuckets; b++) {
bucketNumbers.add(totalBuckets);
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
@@ -339,8 +734,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
*/
private long averageBytesPerRecord() {
long avgSize = 0L;
HoodieTimeline commitTimeline =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants();
try {
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
@@ -372,7 +767,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public int getPartition(Object key) {
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation = (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation = (Tuple2<HoodieKey,
Option<HoodieRecordLocation>>) key;
if (keyLocation._2().isDefined()) {
HoodieRecordLocation location = keyLocation._2().get();
return updateLocationToBucket.get(location.getFileId());
@@ -396,420 +792,4 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
}
@Override
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
return new UpsertPartitioner(profile);
}
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return getUpsertPartitioner(profile);
}
@Override
public boolean isWorkloadProfileNeeded() {
return true;
}
@Override
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String commitTime) {
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords)
throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String commitTime, String fileLoc)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException("Error in finding the old file path at commit " +
commitTime + " at fileLoc: " + fileLoc);
} else {
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
ParquetReader<IndexedRecord> reader =
AvroParquetReader.builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf())
.build();
try {
IndexedRecord record;
while ((record = reader.read()) != null) {
// Two types of writes here (new record, and old record).
// We have already catch the exception during writing new records.
// But for old records, we should fail if any exception happens.
upsertHandle.write((GenericRecord) record);
}
} catch (IOException e) {
throw new HoodieUpsertException(
"Failed to read record from " + upsertHandle.getOldFilePath()
+ " with new Schema " + upsertHandle.getSchema(), e);
} finally {
reader.close();
upsertHandle.close();
}
}
//TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath()
+ ", " + upsertHandle.getWriteStatus());
}
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus()))
.iterator();
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc);
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
return new LazyInsertIterable<>(recordItr, config, commitTime, this);
}
@SuppressWarnings("unchecked")
@Override
public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition,
Iterator recordItr, Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(commitTime, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(commitTime, binfo.fileLoc, recordItr);
} else {
throw new HoodieUpsertException(
"Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
logger.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Integer partition,
Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(commitTime, partition, recordItr, partitioner);
}
/**
* Performs cleaning of partition paths according to cleaning policy and returns the number of
* files cleaned. Handles skews in partitions to clean by making files to clean as the unit of
* task distribution.
*
* @throws IllegalArgumentException if unknown cleaning policy is provided
*/
@Override
public List<HoodieCleanStat> clean(JavaSparkContext jsc) {
try {
FileSystem fs = getMetaClient().getFs();
List<String> partitionsToClean =
FSUtils.getAllPartitionPaths(fs, getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
.getCleanerPolicy());
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here mom. It is already clean");
return Collections.emptyList();
}
return cleanPartitionPaths(partitionsToClean, jsc);
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
}
}
/**
* Common method used for cleaning out parquet files under a partition path during rollback of a
* set of commits
*/
protected Map<FileStatus, Boolean> deleteCleanedFiles(String partitionPath, List<String> commits)
throws IOException {
logger.info("Cleaning path " + partitionPath);
FileSystem fs = getMetaClient().getFs();
FileStatus[] toBeDeleted =
fs.listStatus(new Path(config.getBasePath(), partitionPath), path -> {
if (!path.toString().contains(".parquet")) {
return false;
}
String fileCommitTime = FSUtils.getCommitTime(path.getName());
return commits.contains(fileCommitTime);
});
Map<FileStatus, Boolean> results = Maps.newHashMap();
for (FileStatus file : toBeDeleted) {
boolean success = fs.delete(file.getPath(), false);
results.put(file, success);
logger.info("Delete file " + file.getPath() + "\t" + success);
}
return results;
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
throws IOException {
String actionType = this.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
List<String> inflights = this.getInflightCommitTimeline().getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
// Atomically unpublish all the commits
commits.stream().filter(s -> !inflights.contains(s))
.map(s -> new HoodieInstant(false, actionType, s))
.forEach(activeTimeline::revertToInflight);
logger.info("Unpublished " + commits);
// delete all the data files for all these commits
logger.info("Clean out all parquet files generated for commits: " + commits);
List<HoodieRollbackStat> stats = jsc.parallelize(
FSUtils.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
// Scan all partitions files with this commit time
Map<FileStatus, Boolean> results = deleteCleanedFiles(partitionPath, commits);
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(results).build();
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
// Remove the rolled back inflight commits
commits.stream().map(s -> new HoodieInstant(true, actionType, s))
.forEach(activeTimeline::deleteInflight);
logger.info("Deleted inflight commits " + commits);
return stats;
}
/**
* Finalize the written data files
*
* @param writeStatuses List of WriteStatus
* @return number of files finalized
*/
@Override
@SuppressWarnings("unchecked")
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return Optional.empty();
}
// This is to rename each data file from temporary path to its final location
List<Tuple2<String, Boolean>> results = jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism())
.map(writeStatus -> {
Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>) writeStatus;
HoodieWriteStat writeStat = writeStatTuple2._2();
final FileSystem fs = getMetaClient().getFs();
final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
if (writeStat.getTempPath() != null) {
final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
boolean success;
try {
logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
success = fs.rename(tempPath, finalPath);
} catch (IOException e) {
throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath);
}
if (!success) {
throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath);
}
}
return new Tuple2<>(writeStat.getPath(), true);
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
return Optional.of(results.size());
}
/**
* Clean temporary data files that are produced from previous failed commit or retried spark
* stages.
*/
private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return;
}
final FileSystem fs = getMetaClient().getFs();
final Path temporaryFolder = new Path(config.getBasePath(),
HoodieTableMetaClient.TEMPFOLDER_NAME);
try {
if (!fs.exists(temporaryFolder)) {
logger.info("Temporary folder does not exist: " + temporaryFolder);
return;
}
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
List<Tuple2<String, Boolean>> results = jsc
.parallelize(fileStatusesList, config.getFinalizeWriteParallelism())
.map(fileStatus -> {
FileSystem fs1 = getMetaClient().getFs();
boolean success = fs1.delete(fileStatus.getPath(), false);
logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t"
+ success);
return new Tuple2<>(fileStatus.getPath().toString(), success);
}).collect();
for (Tuple2<String, Boolean> result : results) {
if (!result._2()) {
logger.info("Failed to delete file: " + result._1());
throw new HoodieIOException(
"Failed to delete file in temporary folder: " + result._1());
}
}
} catch (IOException e) {
throw new HoodieIOException(
"Failed to clean data files in temporary folder: " + temporaryFolder);
}
}
private static class PartitionCleanStat implements Serializable {
private final String partitionPath;
private final List<String> deletePathPatterns = new ArrayList<>();
private final List<String> successDeleteFiles = new ArrayList<>();
private final List<String> failedDeleteFiles = new ArrayList<>();
private PartitionCleanStat(String partitionPath) {
this.partitionPath = partitionPath;
}
private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) {
if (deletedFileResult) {
successDeleteFiles.add(deletePathStr);
} else {
failedDeleteFiles.add(deletePathStr);
}
}
private void addDeleteFilePatterns(String deletePathStr) {
deletePathPatterns.add(deletePathStr);
}
private PartitionCleanStat merge(PartitionCleanStat other) {
if (!this.partitionPath.equals(other.partitionPath)) {
throw new RuntimeException(String.format(
"partitionPath is not a match: (%s, %s)",
partitionPath, other.partitionPath));
}
successDeleteFiles.addAll(other.successDeleteFiles);
deletePathPatterns.addAll(other.deletePathPatterns);
failedDeleteFiles.addAll(other.failedDeleteFiles);
return this;
}
}
private List<HoodieCleanStat> cleanPartitionPaths(List<String> partitionsToClean,
JavaSparkContext jsc) {
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
logger.info("Using cleanerParallelism: " + cleanerParallelism);
List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
.parallelize(partitionsToClean, cleanerParallelism)
.flatMapToPair(getFilesToDeleteFunc(this, config))
.repartition(cleanerParallelism) // repartition to remove skews
.mapPartitionsToPair(deleteFilesFunc(this))
.reduceByKey(
// merge partition level clean stats below
(Function2<PartitionCleanStat, PartitionCleanStat, PartitionCleanStat>) (e1, e2) -> e1
.merge(e2))
.collect();
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.stream().collect(Collectors.toMap(e -> e._1(), e -> e._2()));
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
// Return PartitionCleanStat for each partition passed.
return partitionsToClean.stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat =
(partitionCleanStatsMap.containsKey(partitionPath)) ?
partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
return HoodieCleanStat.newBuilder()
.withPolicy(config.getCleanerPolicy())
.withPartitionPath(partitionPath)
.withEarliestCommitRetained(cleaner.getEarliestCommitToRetain())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns)
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
.withFailedDeletes(partitionCleanStat.failedDeleteFiles)
.build();
}).collect(Collectors.toList());
}
private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat> deleteFilesFunc(
HoodieTable table) {
return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) iter -> {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
while (iter.hasNext()) {
Tuple2<String, String> partitionDelFileTuple = iter.next();
String partitionPath = partitionDelFileTuple._1();
String deletePathStr = partitionDelFileTuple._2();
Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath,
new PartitionCleanStat(partitionPath));
}
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
partitionCleanStat.addDeleteFilePatterns(deletePathStr);
partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult);
}
return partitionCleanStatMap.entrySet().stream()
.map(e -> new Tuple2<>(e.getKey(), e.getValue()))
.collect(Collectors.toList()).iterator();
};
}
private static PairFlatMapFunction<String, String, String> getFilesToDeleteFunc(
HoodieTable table, HoodieWriteConfig config) {
return (PairFlatMapFunction<String, String, String>) partitionPathToClean -> {
HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config);
return cleaner.getDeletePaths(partitionPathToClean).stream()
.map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString()))
.iterator();
};
}
private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr)
throws IOException {
Path deletePath = new Path(deletePathStr);
logger.debug("Working on delete path :" + deletePath);
boolean deleteResult = fs.delete(deletePath, false);
if (deleteResult) {
logger.debug("Cleaned file at path :" + deletePath);
}
return deleteResult;
}
}