[HUDI-164] Fixes incorrect averageBytesPerRecord
When number of records written is zero, averageBytesPerRecord results in a huge size (division by zero and ceiled to Long.MAX_VALUE) causing OOM. This commit fixes this issue by reverse traversing the commits until a more reasonable average record size can be computed and if that is not possible returns the default configured record size.
This commit is contained in:
committed by
vinoth chandar
parent
93bc5e2153
commit
64df98fc4a
@@ -628,7 +628,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
private void assignInserts(WorkloadProfile profile) {
|
||||
// for new inserts, compute buckets depending on how many records we have for each partition
|
||||
Set<String> partitionPaths = profile.getPartitionPaths();
|
||||
long averageRecordSize = averageBytesPerRecord();
|
||||
long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline()
|
||||
.filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate());
|
||||
logger.info("AvgRecordSize => " + averageRecordSize);
|
||||
for (String partitionPath : partitionPaths) {
|
||||
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
|
||||
@@ -735,30 +736,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
return smallFileLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains the average record size based on records written during last commit. Used for
|
||||
* estimating how many records pack into one file.
|
||||
*/
|
||||
protected long averageBytesPerRecord() {
|
||||
long avgSize = 0L;
|
||||
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline()
|
||||
.filterCompletedInstants();
|
||||
try {
|
||||
if (!commitTimeline.empty()) {
|
||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get(), HoodieCommitMetadata.class);
|
||||
avgSize = (long) Math.ceil(
|
||||
(1.0 * commitMetadata.fetchTotalBytesWritten()) / commitMetadata
|
||||
.fetchTotalRecordsWritten());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// make this fail safe.
|
||||
logger.error("Error trying to compute average bytes/record ", t);
|
||||
}
|
||||
return avgSize <= 0L ? config.getCopyOnWriteRecordSizeEstimate() : avgSize;
|
||||
}
|
||||
|
||||
public BucketInfo getBucketInfo(int bucketNumber) {
|
||||
return bucketInfoMap.get(bucketNumber);
|
||||
}
|
||||
@@ -803,4 +780,33 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
protected HoodieRollingStatMetadata getRollingStats() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains the average record size based on records written during previous commits. Used for
|
||||
* estimating how many records pack into one file.
|
||||
*/
|
||||
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
|
||||
long avgSize = defaultRecordSizeEstimate;
|
||||
try {
|
||||
if (!commitTimeline.empty()) {
|
||||
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
||||
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
||||
while (instants.hasNext()) {
|
||||
HoodieInstant instant = instants.next();
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
|
||||
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
|
||||
if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
|
||||
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// make this fail safe.
|
||||
logger.error("Error trying to compute average bytes/record ", t);
|
||||
}
|
||||
return avgSize;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user