Performing commit archiving in batches to avoid keeping a huge chunk in memory
This commit is contained in:
committed by
Balaji Varadarajan
parent
b07110b9fd
commit
a8feee9293
@@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
|
||||
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
|
||||
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
|
||||
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
|
||||
// Upsert uses this file size to compact new data onto existing files..
|
||||
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
|
||||
// By default, treat any file <= 100MB as a small file.
|
||||
@@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
|
||||
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
|
||||
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
|
||||
private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10);
|
||||
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target"
|
||||
+ ".partitions";
|
||||
// 500GB of target IO per compaction (both read and write)
|
||||
@@ -240,6 +242,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCommitsArchivalBatchSize(int batchSize) {
|
||||
props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieCompactionConfig build() {
|
||||
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP,
|
||||
@@ -281,6 +288,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
|
||||
setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP),
|
||||
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
|
||||
setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP),
|
||||
COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE);
|
||||
|
||||
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
|
||||
|
||||
|
||||
@@ -249,6 +249,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
.parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP));
|
||||
}
|
||||
|
||||
public int getCommitArchivalBatchSize() {
|
||||
return Integer
|
||||
.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP));
|
||||
}
|
||||
|
||||
/**
|
||||
* index properties
|
||||
**/
|
||||
|
||||
@@ -245,11 +245,11 @@ public class HoodieCommitArchiveLog {
|
||||
List<IndexedRecord> records = new ArrayList<>();
|
||||
for (HoodieInstant hoodieInstant : instants) {
|
||||
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
|
||||
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
|
||||
writeToFile(wrapperSchema, records);
|
||||
}
|
||||
}
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
|
||||
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
|
||||
this.writer = writer.appendBlock(block);
|
||||
writeToFile(wrapperSchema, records);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieCommitException("Failed to archive commits", e);
|
||||
}
|
||||
@@ -259,6 +259,16 @@ public class HoodieCommitArchiveLog {
|
||||
return archiveFilePath;
|
||||
}
|
||||
|
||||
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
|
||||
if (records.size() > 0) {
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
|
||||
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
|
||||
this.writer = writer.appendBlock(block);
|
||||
records.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline,
|
||||
HoodieInstant hoodieInstant) throws IOException {
|
||||
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
|
||||
|
||||
Reference in New Issue
Block a user