1
0

[HUDI-1737][hudi-client] Code Cleanup: Extract common method in HoodieCreateHandle & FlinkCreateHandle (#2745)

This commit is contained in:
Roc Marshal
2021-04-02 11:39:05 +08:00
committed by GitHub
parent 684622c7c9
commit 94a5e72f16
2 changed files with 47 additions and 43 deletions

View File

@@ -179,29 +179,47 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
fileWriter.close(); fileWriter.close();
HoodieWriteStat stat = new HoodieWriteStat(); setupWriteStatus();
stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
writeStatus.setStat(stat);
LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.",
stat.getFileId(), runtimeStats.getTotalCreateTime())); writeStatus.getStat().getPartitionPath(), writeStatus.getStat().getFileId(),
writeStatus.getStat().getRuntimeStats().getTotalCreateTime()));
return Collections.singletonList(writeStatus); return Collections.singletonList(writeStatus);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
} }
} }
/**
* Set up the write status.
*
* @throws IOException if error occurs
*/
protected void setupWriteStatus() throws IOException {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(computeTotalWriteBytes());
stat.setFileSizeInBytes(computeFileSizeInBytes());
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
writeStatus.setStat(stat);
}
protected long computeTotalWriteBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}
protected long computeFileSizeInBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}
} }

View File

@@ -22,7 +22,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -30,7 +29,6 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -89,7 +87,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
*/ */
private WriteStatus getIncrementalWriteStatus() { private WriteStatus getIncrementalWriteStatus() {
try { try {
setUpWriteStatus(); setupWriteStatus();
// reset the write status // reset the write status
recordsWritten = 0; recordsWritten = 0;
recordsDeleted = 0; recordsDeleted = 0;
@@ -102,32 +100,20 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
} }
} }
/** @Override
* Set up the write status. protected long computeTotalWriteBytes() throws IOException {
* long fileSizeInBytes = computeFileSizeInBytes();
* @throws IOException if error occurs
*/
private void setUpWriteStatus() throws IOException {
long fileSizeInBytes = fileWriter.getBytesWritten();
long incFileSizeInBytes = fileSizeInBytes - lastFileSize; long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes; this.lastFileSize = fileSizeInBytes;
HoodieWriteStat stat = new HoodieWriteStat(); return incFileSizeInBytes;
stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(incFileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
writeStatus.setStat(stat);
} }
@Override
protected long computeFileSizeInBytes() throws IOException {
return fileWriter.getBytesWritten();
}
@Override
public void finishWrite() { public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try { try {