diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 0735b8752..ac81a4350 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -389,21 +389,13 @@ public class HoodieCopyOnWriteTable extends Hoodi */ protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline, HoodieInstant instantToBeDeleted) { + // Remove marker files always on rollback + deleteMarkerDir(instantToBeDeleted.getTimestamp()); + // Remove the rolled back inflight commits if (deleteInstant) { - try { - //TODO: Cleanup Hoodie 1.0 rollback to simply call super.cleanFailedWrites with consistency check disabled - // and empty WriteStat list. - Path markerDir = new Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp())); - logger.info("Removing marker directory=" + markerDir); - if (metaClient.getFs().exists(markerDir)) { - metaClient.getFs().delete(markerDir, true); - } - activeTimeline.deleteInflight(instantToBeDeleted); - logger.info("Deleted inflight commit " + instantToBeDeleted); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } + activeTimeline.deleteInflight(instantToBeDeleted); + logger.info("Deleted inflight commit " + instantToBeDeleted); } else { logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index d8182488f..8e8ded9c4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -294,6 +294,24 @@ public abstract class HoodieTable implements Seri cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled()); } + /** + * Delete Marker directory corresponding to an instant + * @param instantTs Instant Time + */ + protected void deleteMarkerDir(String instantTs) { + try { + FileSystem fs = getMetaClient().getFs(); + Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs)); + if (fs.exists(markerDir)) { + // For append only case, we do not write to marker dir. Hence, the above check + logger.info("Removing marker directory=" + markerDir); + fs.delete(markerDir, true); + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + /** * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark * retries. @@ -364,11 +382,7 @@ public abstract class HoodieTable implements Seri } } // Now delete the marker directory - if (fs.exists(markerDir)) { - // For append only case, we do not write to marker dir. Hence, the above check - logger.info("Removing marker directory=" + markerDir); - fs.delete(markerDir, true); - } + deleteMarkerDir(instantTs); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); }