1
0

[HUDI-1584] Modify maker file path, which should start with the target base path. (#2539)

This commit is contained in:
ZhangChaoMing
2021-03-02 17:52:21 +08:00
committed by GitHub
parent 73fa308ff0
commit 0dde7f9185

View File

@@ -125,7 +125,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId); super.prepareSnapshotPreBarrier(checkpointId);
String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get()); String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName); Path path = generateCurrentMakerFilePath(instantMarkerFileName);
// create marker file // create marker file
fs.create(path, true); fs.create(path, true);
LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName); LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
@@ -231,7 +231,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
FileStatus[] fileStatuses; FileStatus[] fileStatuses;
Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME); Path instantMarkerPath = generateCurrentMakerDirPath();
// waiting all subtask create marker file ready // waiting all subtask create marker file ready
while (true) { while (true) {
Thread.sleep(500L); Thread.sleep(500L);
@@ -283,4 +283,13 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
fs.delete(fileStatus.getPath(), true); fs.delete(fileStatus.getPath(), true);
} }
} }
private Path generateCurrentMakerDirPath() {
Path auxPath = new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME);
}
private Path generateCurrentMakerFilePath(String instantMarkerFileName) {
return new Path(generateCurrentMakerDirPath(), instantMarkerFileName);
}
} }