1
0

Incorporating code review feedback for finalizeWrite for COW #3

This commit is contained in:
Jian Xu
2017-12-21 09:58:51 -08:00
committed by vinoth chandar
parent 37f2cdd7e4
commit acae6586f3
4 changed files with 8 additions and 13 deletions

View File

@@ -440,9 +440,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
);
}
// Clean temp files
table.cleanTemporaryDataFiles(jsc);
// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v));
@@ -698,9 +695,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
});
// clean data files in temporary folder
table.cleanTemporaryDataFiles(jsc);
try {
if (commitTimeline.empty() && inflightTimeline.empty()) {
// nothing to rollback

View File

@@ -61,7 +61,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = "5";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private HoodieWriteConfig(Properties props) {
super(props);

View File

@@ -569,6 +569,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
.withDeletedFileResults(results).build();
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
// Remove the rolled back inflight commits
commits.stream().map(s -> new HoodieInstant(true, actionType, s))
.forEach(activeTimeline::deleteInflight);
@@ -609,11 +612,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new Tuple2<>(writeStat.getPath(), true);
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
return Optional.of(results.size());
}
@Override
public void cleanTemporaryDataFiles(JavaSparkContext jsc) {
private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return;
}

View File

@@ -281,8 +281,4 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*/
public abstract Optional<Integer> finalizeWrite(JavaSparkContext jsc, List<Tuple2<String, HoodieWriteStat>> writeStatuses);
/**
* Clean temporary data files after data files are finalized or commit is rolled back.
*/
public abstract void cleanTemporaryDataFiles(JavaSparkContext jsc);
}