1
0

1. Small file size handling for inserts into log files. In summary, the total size of the log file is compared with the parquet max file size and if there is scope to add inserts the add it.

This commit is contained in:
Nishith Agarwal
2018-06-11 20:27:56 -07:00
committed by vinoth chandar
parent 324de298bc
commit 459e523d9e
21 changed files with 824 additions and 123 deletions

View File

@@ -32,6 +32,8 @@ import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStat;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -501,9 +503,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
writeStatus.getPartitionPath(), writeStatus.getStat())).collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
}
updateMetadataAndRollingStats(actionType, metadata, stats);
// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
@@ -1256,4 +1257,42 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
});
return compactionInstantTimeOpt;
}
private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List<Tuple2<String,
HoodieWriteStat>> stats) {
// TODO : make sure we cannot rollback / archive last commit file
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
// there may be race conditions
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
// 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
// 2. Now, first read the existing rolling stats and merge with the result of current metadata.
// Need to do this on every commit (delta or commit) to support COW and MOR.
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat._2().getFileId(),
stat._2().getNumWrites() - (stat._2().getNumUpdateWrites() - stat._2.getNumDeletes()),
stat._2().getNumUpdateWrites(), stat._2.getNumDeletes(), stat._2().getTotalWriteBytes());
rollingStatMetadata.addRollingStat(stat._1, hoodieRollingStat);
}
// The last rolling stat should be present in the completed timeline
Optional<HoodieInstant> lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant
.get()).get(), HoodieCommitMetadata.class);
rollingStatMetadata = rollingStatMetadata
.merge(HoodieCommitMetadata.fromBytes(commitMetadata.getExtraMetadata()
.get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class));
}
metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString());
} catch (IOException io) {
throw new HoodieCommitException("Unable to save rolling stats");
}
}
}

View File

@@ -45,7 +45,11 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
public InMemoryHashIndex(HoodieWriteConfig config) {
super(config);
recordLocationMap = new ConcurrentHashMap<>();
synchronized (InMemoryHashIndex.class) {
if (recordLocationMap == null) {
recordLocationMap = new ConcurrentHashMap<>();
}
}
}
@Override

View File

@@ -75,6 +75,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private long recordsWritten = 0;
// Total number of records deleted during an append
private long recordsDeleted = 0;
// Total number of records updated during an append
private long updatedRecordsWritten = 0;
// Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk
private long averageRecordSize = 0;
private HoodieLogFile currentLogFile;
@@ -89,6 +91,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private int maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
private Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
// Total number of new records inserted into the delta file
private long insertRecordsWritten = 0;
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String fileId, Iterator<HoodieRecord<T>> recordItr) {
@@ -111,6 +115,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
// extract some information from the first record
Optional<FileSlice> fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst();
// Set the base commit time as the current commitTime for new inserts into log files
String baseInstantTime = commitTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
@@ -156,6 +161,12 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils
.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId);
// If currentLocation is present, then this is an update
if (hoodieRecord.getCurrentLocation() != null) {
updatedRecordsWritten++;
} else {
insertRecordsWritten++;
}
recordsWritten++;
} else {
recordsDeleted++;
@@ -238,6 +249,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.getStat().setPrevCommit(commitTime);
writeStatus.getStat().setFileId(this.fileId);
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
writeStatus.getStat().setNumInserts(insertRecordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());

View File

@@ -31,6 +31,7 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
import com.uber.hoodie.common.model.ActionType;
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -273,7 +275,7 @@ public class HoodieCommitArchiveLog {
}
case COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
@@ -294,7 +296,7 @@ public class HoodieCommitArchiveLog {
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
@@ -312,6 +314,8 @@ public class HoodieCommitArchiveLog {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
com.uber.hoodie.avro.model.HoodieCommitMetadata avroMetaData = mapper
.convertValue(hoodieCommitMetadata, com.uber.hoodie.avro.model.HoodieCommitMetadata.class);
// Do not archive Rolling Stats, cannot set to null since AVRO will throw null pointer
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, StringUtils.EMPTY);
return avroMetaData;
}
}

View File

@@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -643,12 +644,18 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;
/**
* Rolling stats for files
*/
protected HoodieRollingStatMetadata rollingStatMetadata;
protected long averageRecordSize;
UpsertPartitioner(WorkloadProfile profile) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();
rollingStatMetadata = getRollingStats();
assignUpdates(profile);
assignInserts(profile);
@@ -792,7 +799,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* Obtains the average record size based on records written during last commit. Used for
* estimating how many records pack into one file.
*/
private long averageBytesPerRecord() {
protected long averageBytesPerRecord() {
long avgSize = 0L;
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants();
@@ -800,7 +807,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get());
.fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get(), HoodieCommitMetadata.class);
avgSize = (long) Math.ceil(
(1.0 * commitMetadata.fetchTotalBytesWritten()) / commitMetadata
.fetchTotalRecordsWritten());
@@ -852,4 +859,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
}
protected HoodieRollingStatMetadata getRollingStats() {
return null;
}
}

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.table;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.uber.hoodie.WriteStatus;
@@ -27,10 +28,11 @@ import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStat;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -40,6 +42,7 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.MergeOnReadLazyInsertIterable;
@@ -103,7 +106,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
Iterator<HoodieRecord<T>> recordItr) throws IOException {
logger.info("Merging updates for commit " + commitTime + " for file " + fileId);
if (mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
logger.info(
"Small file corrections for updates for commit " + commitTime + " for file " + fileId);
return super.handleUpdate(commitTime, fileId, recordItr);
@@ -188,8 +191,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
.filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight);
logger.info("Unpublished " + commits);
Long startTime = System.currentTimeMillis();
// TODO (NA) : remove this once HoodieIndex is a member of HoodieTable
HoodieIndex hoodieIndex = HoodieIndex.createIndex(config, jsc);
List<HoodieRollbackStat> allRollbackStats = jsc.parallelize(FSUtils
.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()))
@@ -224,90 +225,53 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
}
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline().getInstantDetails(
new HoodieInstant(true, instant.getAction(), instant.getTimestamp()))
.get());
.get(), HoodieCommitMetadata.class);
// read commit file and (either append delete blocks or delete file)
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream()
.map(entry -> {
Path filePath = entry.getKey().getPath();
return FSUtils.getFileIdFromFilePath(filePath);
}).collect(Collectors.toSet());
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
Map<String, String> fileIdToLatestCommitTimeMap =
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null;
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> {
if (wStat != null
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
&& wStat.getPrevCommit() != null && !deletedFiles.contains(wStat.getFileId())) {
return true;
}
return false;
})
.forEach(wStat -> {
Writer writer = null;
String baseCommitTime = wStat.getPrevCommit();
if (hoodieIndex.isGlobal()) {
baseCommitTime = fileIdToLatestCommitTimeMap.get(wStat.getFileId());
}
try {
// TODO : wStat.getPrevCommit() might not give the right commit time in the following
// scenario if a compaction was scheduled, the new commitTime will be used to write the
// new log file. In this case, the commit time for the log file is the
// getBaseCommitTime()
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
new Path(this.getMetaClient().getBasePath(), partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
.withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Long numRollbackBlocks = 0L;
// generate metadata
Map<HeaderMetadataType, String> header =
Maps.newHashMap();
header.put(HeaderMetadataType.INSTANT_TIME,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME,
commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String
.valueOf(
HoodieCommandBlockTypeEnum
.ROLLBACK_PREVIOUS_BLOCK
.ordinal()));
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
numRollbackBlocks++;
filesToNumBlocksRollback.put(this.getMetaClient().getFs()
.getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks);
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException(
"Failed to rollback for commit " + commit, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
});
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus)
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus,
filesToNumBlocksRollback, deletedFiles);
}
break;
} catch (IOException io) {
@@ -335,6 +299,25 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
return Optional.empty();
}
@Override
protected HoodieRollingStatMetadata getRollingStats() {
try {
Optional<HoodieInstant> lastInstant = this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata
.fromBytes(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)
.getBytes(), HoodieRollingStatMetadata.class);
return rollingStatMetadata;
}
return null;
} catch (IOException e) {
throw new HoodieException();
}
}
/**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet
* files to larger ones without the need for an index in the logFile.
@@ -345,7 +328,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
super(profile);
}
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
@@ -354,32 +336,59 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = getCompletedCommitTimeline();
// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
// TODO - check if index.isglobal then small files are log files too
Optional<FileSlice> smallFileSlice = getRTFileSystemView()
// Use the merged file-slice for small file selection
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter(
fileSlice -> fileSlice.getLogFiles().count() < 1
&& fileSlice.getDataFile().get().getFileSize() < config
.getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) ->
left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize()
? -1 : 1).findFirst();
if (smallFileSlice.isPresent()) {
String filename = smallFileSlice.get().getDataFile().get().getFileName();
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!index.canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Optional<FileSlice> smallFileSlice = getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter(
fileSlice -> fileSlice.getLogFiles().count() < 1
&& fileSlice.getDataFile().get().getFileSize() < config
.getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) ->
left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize()
? -1 : 1).findFirst();
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
} else {
// If we can index log files, we can add more inserts to log files.
List<FileSlice> allFileSlices = getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp())
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(partitionPath, fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
sf.sizeBytes = smallFileSlice.get().getDataFile().get().getFileSize();
smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
if (smallFileSlice.getDataFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getDataFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice);
smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice);
smallFileLocations.add(sf);
// Update the global small files list
smallFiles.add(sf);
}
}
}
return smallFileLocations;
}
@@ -388,6 +397,116 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
.map(smallFile -> ((SmallFile) smallFile).location.getFileId())
.collect(Collectors.toList());
}
private long getTotalFileSize(String partitionPath, FileSlice fileSlice) {
if (rollingStatMetadata != null) {
Map<String, HoodieRollingStat> partitionRollingStats =
rollingStatMetadata.getPartitionToRollingStats().get(partitionPath);
if (partitionRollingStats != null) {
HoodieRollingStat rollingStatForFile = partitionRollingStats.get(fileSlice.getFileId());
if (rollingStatForFile != null) {
long inserts = rollingStatForFile.getInserts();
long totalSize = averageRecordSize * inserts;
return totalSize;
}
}
}
// In case Rolling Stats is not present, fall back to sizing log files based on heuristics
if (!fileSlice.getDataFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
return fileSlice.getDataFile().get().getFileSize() + convertLogFilesSizeToExpectedParquetSize(fileSlice
.getLogFiles().collect(Collectors.toList()));
}
}
private boolean isSmallFile(String partitionPath, FileSlice fileSlice) {
long totalSize = getTotalFileSize(partitionPath, fileSlice);
if (totalSize < config.getParquetMaxFileSize()) {
return true;
}
return false;
}
// TODO (NA) : Make this static part of utility
@VisibleForTesting
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(hoodieLogFile -> hoodieLogFile.getFileSize().get())
.reduce((a, b) -> (a + b)).orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
long logFilesEquivalentParquetFileSize = (long) (totalSizeOfLogFiles * config
.getLogFileToParquetCompressionRatio());
return logFilesEquivalentParquetFileSize;
}
}
private Map<HeaderMetadataType, String> generateHeader(String commit) {
// generate metadata
Map<HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
.ordinal()));
return header;
}
private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPath, String commit,
HoodieCommitMetadata commitMetadata, final Map<FileStatus, Boolean> filesToDeletedStatus,
Map<FileStatus, Long> filesToNumBlocksRollback, Set<String> deletedFiles) {
// The following needs to be done since GlobalIndex at the moment does not store the latest commit time.
// Also, wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
Map<String, String> fileIdToBaseCommitTimeForLogMap =
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null;
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
if (wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null
&& !deletedFiles.contains(wStat.getFileId())) {
return true;
}
return false;
}).forEach(wStat -> {
HoodieLogFormat.Writer writer = null;
String baseCommitTime = wStat.getPrevCommit();
if (hoodieIndex.isGlobal()) {
baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
}
try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
new Path(this.getMetaClient().getBasePath(), partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
.withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Long numRollbackBlocks = 0L;
// generate metadata
Map<HeaderMetadataType, String> header = generateHeader(commit);
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
numRollbackBlocks++;
filesToNumBlocksRollback.put(this.getMetaClient().getFs()
.getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks);
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException(
"Failed to rollback for commit " + commit, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new UncheckedIOException(io);
}
}
});
return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus)
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
}
}