HUDI-147 Compaction Inflight Rollback not deleting Marker directory
This commit is contained in:
committed by
vinoth chandar
parent
479908fd20
commit
065173211e
@@ -389,21 +389,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
*/
|
*/
|
||||||
protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
|
protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
|
||||||
HoodieInstant instantToBeDeleted) {
|
HoodieInstant instantToBeDeleted) {
|
||||||
|
// Remove marker files always on rollback
|
||||||
|
deleteMarkerDir(instantToBeDeleted.getTimestamp());
|
||||||
|
|
||||||
// Remove the rolled back inflight commits
|
// Remove the rolled back inflight commits
|
||||||
if (deleteInstant) {
|
if (deleteInstant) {
|
||||||
try {
|
activeTimeline.deleteInflight(instantToBeDeleted);
|
||||||
//TODO: Cleanup Hoodie 1.0 rollback to simply call super.cleanFailedWrites with consistency check disabled
|
logger.info("Deleted inflight commit " + instantToBeDeleted);
|
||||||
// and empty WriteStat list.
|
|
||||||
Path markerDir = new Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp()));
|
|
||||||
logger.info("Removing marker directory=" + markerDir);
|
|
||||||
if (metaClient.getFs().exists(markerDir)) {
|
|
||||||
metaClient.getFs().delete(markerDir, true);
|
|
||||||
}
|
|
||||||
activeTimeline.deleteInflight(instantToBeDeleted);
|
|
||||||
logger.info("Deleted inflight commit " + instantToBeDeleted);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException(e.getMessage(), e);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
|
logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -294,6 +294,24 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
|
cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete Marker directory corresponding to an instant
|
||||||
|
* @param instantTs Instant Time
|
||||||
|
*/
|
||||||
|
protected void deleteMarkerDir(String instantTs) {
|
||||||
|
try {
|
||||||
|
FileSystem fs = getMetaClient().getFs();
|
||||||
|
Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
|
||||||
|
if (fs.exists(markerDir)) {
|
||||||
|
// For append only case, we do not write to marker dir. Hence, the above check
|
||||||
|
logger.info("Removing marker directory=" + markerDir);
|
||||||
|
fs.delete(markerDir, true);
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
|
* Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
|
||||||
* retries.
|
* retries.
|
||||||
@@ -364,11 +382,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Now delete the marker directory
|
// Now delete the marker directory
|
||||||
if (fs.exists(markerDir)) {
|
deleteMarkerDir(instantTs);
|
||||||
// For append only case, we do not write to marker dir. Hence, the above check
|
|
||||||
logger.info("Removing marker directory=" + markerDir);
|
|
||||||
fs.delete(markerDir, true);
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user