HUDI-168 Ensure getFileStatus calls for files getting written is done after close() is called (#788)
This commit is contained in:
committed by
vinoth chandar
parent
c0593e7a13
commit
3d408ee96b
@@ -505,20 +505,18 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
}).forEach(wStat -> {
|
}).forEach(wStat -> {
|
||||||
HoodieLogFormat.Writer writer = null;
|
HoodieLogFormat.Writer writer = null;
|
||||||
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
|
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
|
||||||
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
|
writer = HoodieLogFormat.newWriterBuilder().onParentPath(
|
||||||
FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath))
|
FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath))
|
||||||
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
|
.withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime)
|
||||||
.withFs(this.metaClient.getFs())
|
.withFs(this.metaClient.getFs())
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
||||||
Long numRollbackBlocks = 0L;
|
|
||||||
// generate metadata
|
// generate metadata
|
||||||
Map<HeaderMetadataType, String> header = generateHeader(commit);
|
Map<HeaderMetadataType, String> header = generateHeader(commit);
|
||||||
// if update belongs to an existing log file
|
// if update belongs to an existing log file
|
||||||
writer = writer.appendBlock(new HoodieCommandBlock(header));
|
writer = writer.appendBlock(new HoodieCommandBlock(header));
|
||||||
numRollbackBlocks++;
|
success = true;
|
||||||
filesToNumBlocksRollback.put(this.getMetaClient().getFs()
|
|
||||||
.getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks);
|
|
||||||
} catch (IOException | InterruptedException io) {
|
} catch (IOException | InterruptedException io) {
|
||||||
throw new HoodieRollbackException(
|
throw new HoodieRollbackException(
|
||||||
"Failed to rollback for commit " + commit, io);
|
"Failed to rollback for commit " + commit, io);
|
||||||
@@ -527,6 +525,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
if (success) {
|
||||||
|
// This step is intentionally done after writer is closed. Guarantees that
|
||||||
|
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
|
||||||
|
// cloud-storage : HUDI-168
|
||||||
|
filesToNumBlocksRollback.put(this.getMetaClient().getFs()
|
||||||
|
.getFileStatus(writer.getLogFile().getPath()), 1L);
|
||||||
|
}
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new UncheckedIOException(io);
|
throw new UncheckedIOException(io);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user