diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 357cf1b3b..6fa9b56a6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -179,29 +179,47 @@ public class HoodieCreateHandle extends fileWriter.close(); - HoodieWriteStat stat = new HoodieWriteStat(); - stat.setPartitionPath(writeStatus.getPartitionPath()); - stat.setNumWrites(recordsWritten); - stat.setNumDeletes(recordsDeleted); - stat.setNumInserts(insertRecordsWritten); - stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); - stat.setFileId(writeStatus.getFileId()); - stat.setPath(new Path(config.getBasePath()), path); - long fileSizeInBytes = FSUtils.getFileSize(fs, path); - stat.setTotalWriteBytes(fileSizeInBytes); - stat.setFileSizeInBytes(fileSizeInBytes); - stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); - RuntimeStats runtimeStats = new RuntimeStats(); - runtimeStats.setTotalCreateTime(timer.endTimer()); - stat.setRuntimeStats(runtimeStats); - writeStatus.setStat(stat); + setupWriteStatus(); - LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalCreateTime())); + LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", + writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(), + writeStatus.getStat().getRuntimeStats().getTotalCreateTime())); return Collections.singletonList(writeStatus); } catch (IOException e) { throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); } } + + /** + * Set up the write status. + * + * @throws IOException if error occurs + */ + protected void setupWriteStatus() throws IOException { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(writeStatus.getPartitionPath()); + stat.setNumWrites(recordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setNumInserts(insertRecordsWritten); + stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); + stat.setFileId(writeStatus.getFileId()); + stat.setPath(new Path(config.getBasePath()), path); + stat.setTotalWriteBytes(computeTotalWriteBytes()); + stat.setFileSizeInBytes(computeFileSizeInBytes()); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalCreateTime(timer.endTimer()); + stat.setRuntimeStats(runtimeStats); + writeStatus.setStat(stat); + } + + protected long computeTotalWriteBytes() throws IOException { + return FSUtils.getFileSize(fs, path); + } + + protected long computeFileSizeInBytes() throws IOException { + return FSUtils.getFileSize(fs, path); + } + } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 6f4638e85..2abefa91f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; @@ -30,7 +29,6 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -89,7 +87,7 @@ public class FlinkCreateHandle */ private WriteStatus getIncrementalWriteStatus() { try { - setUpWriteStatus(); + setupWriteStatus(); // reset the write status recordsWritten = 0; recordsDeleted = 0; @@ -102,32 +100,20 @@ public class FlinkCreateHandle } } - /** - * Set up the write status. - * - * @throws IOException if error occurs - */ - private void setUpWriteStatus() throws IOException { - long fileSizeInBytes = fileWriter.getBytesWritten(); + @Override + protected long computeTotalWriteBytes() throws IOException { + long fileSizeInBytes = computeFileSizeInBytes(); long incFileSizeInBytes = fileSizeInBytes - lastFileSize; this.lastFileSize = fileSizeInBytes; - HoodieWriteStat stat = new HoodieWriteStat(); - stat.setPartitionPath(writeStatus.getPartitionPath()); - stat.setNumWrites(recordsWritten); - stat.setNumDeletes(recordsDeleted); - stat.setNumInserts(insertRecordsWritten); - stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); - stat.setFileId(writeStatus.getFileId()); - stat.setPath(new Path(config.getBasePath()), path); - stat.setTotalWriteBytes(incFileSizeInBytes); - stat.setFileSizeInBytes(fileSizeInBytes); - stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); - HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); - runtimeStats.setTotalCreateTime(timer.endTimer()); - stat.setRuntimeStats(runtimeStats); - writeStatus.setStat(stat); + return incFileSizeInBytes; } + @Override + protected long computeFileSizeInBytes() throws IOException { + return fileWriter.getBytesWritten(); + } + + @Override public void finishWrite() { LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try {