Adding support for optional skipping single archiving failures
This commit is contained in:
committed by
vinoth chandar
parent
66c7fa2322
commit
1b61eb45e0
@@ -70,7 +70,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
|
||||
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
|
||||
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
|
||||
|
||||
private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
|
||||
private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
|
||||
// time between successive attempts to ensure written data's metadata is consistent on storage
|
||||
private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
|
||||
"hoodie.consistency.check.initial_interval_ms";
|
||||
@@ -169,6 +170,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
|
||||
}
|
||||
|
||||
public boolean isFailOnTimelineArchivingEnabled() {
|
||||
return Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
|
||||
}
|
||||
|
||||
public int getMaxConsistencyChecks() {
|
||||
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
|
||||
}
|
||||
@@ -696,6 +701,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
|
||||
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
|
||||
MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
|
||||
setDefaultOnCondition(props, !props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP),
|
||||
FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED);
|
||||
|
||||
// Make sure the props is propagated
|
||||
setDefaultOnCondition(props, !isIndexConfigSet,
|
||||
|
||||
@@ -246,9 +246,16 @@ public class HoodieCommitArchiveLog {
|
||||
log.info("Wrapper schema " + wrapperSchema.toString());
|
||||
List<IndexedRecord> records = new ArrayList<>();
|
||||
for (HoodieInstant hoodieInstant : instants) {
|
||||
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
|
||||
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
|
||||
writeToFile(wrapperSchema, records);
|
||||
try {
|
||||
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
|
||||
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
|
||||
writeToFile(wrapperSchema, records);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
|
||||
if (this.config.isFailOnTimelineArchivingEnabled()) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
writeToFile(wrapperSchema, records);
|
||||
|
||||
Reference in New Issue
Block a user