diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java index 839cd5ce1..8967f2b67 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java @@ -86,7 +86,7 @@ public class RepairsCommand implements CommandMarker { int ind = 0; for (String partition : partitionPaths) { - Path partitionPath = new Path(basePath, partition); + Path partitionPath = FSUtils.getPartitionPath(basePath, partition); String[] row = new String[3]; row[0] = partition; row[1] = "Yes"; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index cc98f5621..99d3c638d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -274,7 +274,7 @@ public class HoodieAppendHandle extends HoodieWri Optional latestLogFile = fileSlice.get().getLatestLogFile(); return HoodieLogFormat.newWriterBuilder() - .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) + .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion( latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 6b9f4f6ab..e1c4b99df 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -315,7 +315,7 @@ public class HoodieCopyOnWriteTable extends Hoodi throws IOException { logger.info("Cleaning path " + partitionPath); FileSystem fs = getMetaClient().getFs(); - FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter); + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); @@ -340,7 +340,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } return false; }; - FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter); + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 589b7090e..af4505216 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -507,7 +507,7 @@ public class HoodieMergeOnReadTable extends String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); try { writer = HoodieLogFormat.newWriterBuilder().onParentPath( - new Path(this.getMetaClient().getBasePath(), partitionPath)) + FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath)) .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) .withFs(this.metaClient.getFs()) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index 42ad091cd..b76ba46ce 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.uber.hoodie.common.util.FSUtils; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; @@ -30,7 +31,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -107,7 +107,7 @@ public class HoodieCommitMetadata implements Serializable { HashMap fullPaths = new HashMap<>(); for (Map.Entry entry : getFileIdAndRelativePaths().entrySet()) { String fullPath = - (entry.getValue() != null) ? (new Path(basePath, entry.getValue())).toString() : null; + (entry.getValue() != null) ? (FSUtils.getPartitionPath(basePath, entry.getValue())).toString() : null; fullPaths.put(entry.getKey(), fullPath); } return fullPaths; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 24cab8df9..642c08d64 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -105,6 +105,12 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterCompletedAndCompactionInstants(); + /** + * Timeline to just include commits (commit/deltacommit) and compaction actions + * @return + */ + HoodieTimeline getCommitsAndCompactionTimeline(); + /** * Filter this timeline to just include requested and inflight compaction instants * @return diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java index d77b9fe7e..ed80c3c14 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.table.timeline; +import com.google.common.collect.Sets; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.StringUtils; import com.uber.hoodie.exception.HoodieException; @@ -25,6 +26,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -102,6 +104,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline { }), details); } + @Override + public HoodieTimeline getCommitsAndCompactionTimeline() { + Set validActions = Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION); + return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); + } + @Override public HoodieTimeline filterPendingCompactionTimeline() { return new HoodieDefaultTimeline( diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java index 8b05b695b..62410acef 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java @@ -71,8 +71,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV protected HoodieTableMetaClient metaClient; - // This is the commits that will be visible for all views extending this view - protected HoodieTimeline visibleActiveTimeline; + // This is the commits timeline that will be visible for all views extending this view + private HoodieTimeline visibleCommitsAndCompactionTimeline; // Used to concurrently load and populate partition views private ConcurrentHashMap addedPartitions = new ConcurrentHashMap<>(4096); @@ -92,7 +92,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV */ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { this.metaClient = metaClient; - this.visibleActiveTimeline = visibleActiveTimeline; + refreshTimeline(visibleActiveTimeline); + // Load Pending Compaction Operations resetPendingCompactionOperations( CompactionUtils.getAllPendingCompactionOperations(metaClient).values() @@ -100,12 +101,20 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV CompactionOperation.convertFromAvroRecordInstance(e.getValue())))); } + /** + * Refresh commits timeline + * @param visibleActiveTimeline Visible Active Timeline + */ + protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { + this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsAndCompactionTimeline(); + } + /** * Adds the provided statuses into the file system view, and also caches it inside this object. */ protected List addFilesToView(FileStatus[] statuses) { HoodieTimer timer = new HoodieTimer().startTimer(); - List fileGroups = buildFileGroups(statuses, visibleActiveTimeline, true); + List fileGroups = buildFileGroups(statuses, visibleCommitsAndCompactionTimeline, true); long fgBuildTimeTakenMs = timer.endTimer(); timer.startTimer(); // Group by partition for efficient updates for both InMemory and DiskBased stuctures. @@ -133,7 +142,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV protected List buildFileGroups(Stream dataFileStream, Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { - Map, List> dataFiles = dataFileStream .collect(Collectors.groupingBy((dataFile) -> { String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath()); @@ -187,7 +195,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV resetViewState(); // Initialize with new Hoodie timeline. - init(metaClient, visibleActiveTimeline); + init(metaClient, getTimeline()); } finally { writeLock.unlock(); } @@ -288,6 +296,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { Option> compactionWithInstantTime = getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId()); + log.info("Pending Compaction instant for (" + fileSlice + ") is :" + compactionWithInstantTime); return (compactionWithInstantTime.isPresent()) && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey()); } @@ -300,6 +309,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV */ protected FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) { if (isFileSliceAfterPendingCompaction(fileSlice)) { + log.info("File Slice (" + fileSlice + ") is in pending compaction"); // Data file is filtered out of the file-slice as the corresponding compaction // instant not completed yet. FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), @@ -417,7 +427,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); return fetchAllDataFiles(partitionPath) - .filter(df -> visibleActiveTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) + .filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime())) .filter(df -> !isDataFileDueToPendingCompaction(df)); } finally { readLock.unlock(); @@ -794,12 +804,12 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV @Override public Option getLastInstant() { - return Option.fromJavaOptional(visibleActiveTimeline.lastInstant()); + return Option.fromJavaOptional(getTimeline().lastInstant()); } @Override public HoodieTimeline getTimeline() { - return visibleActiveTimeline; + return visibleCommitsAndCompactionTimeline; } @Override @@ -822,10 +832,18 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV * @param newTimeline New Hoodie Timeline */ protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { - visibleActiveTimeline = newTimeline; + refreshTimeline(newTimeline); addedPartitions.clear(); resetViewState(); // Initialize with new Hoodie timeline. init(metaClient, newTimeline); } + + /** + * Return Only Commits and Compaction timeline for building file-groups + * @return + */ + public HoodieTimeline getVisibleCommitsAndCompactionTimeline() { + return visibleCommitsAndCompactionTimeline; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 2e52b9f3d..741d7eee2 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -198,7 +198,6 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem partitionToFileGroupsMap.put(partitionPath, newList); } - @Override public Stream fetchAllStoredFileGroups() { return partitionToFileGroupsMap.values().stream().flatMap(fg -> { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java index 76e7a5389..c087de09d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/IncrementalTimelineSyncFileSystemView.java @@ -56,10 +56,19 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl // Allows incremental Timeline syncing private final boolean incrementalTimelineSyncEnabled; + // This is the visible active timeline used only for incremental view syncing + private HoodieTimeline visibleActiveTimeline; + protected IncrementalTimelineSyncFileSystemView(boolean enableIncrementalTimelineSync) { this.incrementalTimelineSyncEnabled = enableIncrementalTimelineSync; } + @Override + protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { + this.visibleActiveTimeline = visibleActiveTimeline; + super.refreshTimeline(visibleActiveTimeline); + } + @Override protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { try { @@ -70,7 +79,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl runIncrementalSync(newTimeline, diffResult); log.info("Finished incremental sync"); // Reset timeline to latest - visibleActiveTimeline = newTimeline; + refreshTimeline(newTimeline); return; } } @@ -337,4 +346,9 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl buildFileGroups(viewDataFiles.values().stream(), viewLogFiles.values().stream(), timeline, true); storePartitionView(partition, fgs); } + + @Override + public HoodieTimeline getTimeline() { + return visibleActiveTimeline; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java index 9edbb941c..ab598e881 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java @@ -319,7 +319,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste return sliceStream.map(s -> Pair.of(Pair.of(s.getPartitionPath(), s.getFileId()), s)) .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream().map(slicePair -> { HoodieFileGroup fg = - new HoodieFileGroup(slicePair.getKey().getKey(), slicePair.getKey().getValue(), visibleActiveTimeline); + new HoodieFileGroup(slicePair.getKey().getKey(), slicePair.getKey().getValue(), + getVisibleCommitsAndCompactionTimeline()); slicePair.getValue().forEach(e -> fg.addFileSlice(e.getValue())); return fg; }); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 4cae7a657..7a21a3dac 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -103,8 +103,7 @@ public class HoodieTableFileSystemViewTest { fsView.close(); fsView = null; } - fsView = getFileSystemView( - metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants()); + fsView = getFileSystemView(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants()); roView = (TableFileSystemView.ReadOptimizedView) fsView; rtView = (TableFileSystemView.RealtimeView) fsView; } @@ -615,10 +614,12 @@ public class HoodieTableFileSystemViewTest { // Put some files in the partition String fullPartitionPath = basePath + "/2016/05/01/"; new File(fullPartitionPath).mkdirs(); - String commitTime1 = "1"; - String commitTime2 = "2"; - String commitTime3 = "3"; - String commitTime4 = "4"; + String cleanTime1 = "1"; + String commitTime1 = "2"; + String commitTime2 = "3"; + String commitTime3 = "4"; + String commitTime4 = "5"; + String fileId1 = UUID.randomUUID().toString(); String fileId2 = UUID.randomUUID().toString(); String fileId3 = UUID.randomUUID().toString(); @@ -640,11 +641,29 @@ public class HoodieTableFileSystemViewTest { new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile(); + // Create commit/clean files + new File(basePath + "/.hoodie/" + cleanTime1 + ".clean").createNewFile(); new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile(); + testStreamLatestVersionInPartition(isLatestFileSliceOnly, fullPartitionPath, commitTime1, commitTime2, commitTime3, + commitTime4, fileId1, fileId2, fileId3, fileId4); + + // Now create a scenario where archiving deleted commits (1,2, and 3) but retained cleaner clean1. Now clean1 is + // the lowest commit time. Scenario for HUDI-162 - Here clean is the earliest action in active timeline + new File(basePath + "/.hoodie/" + commitTime1 + ".commit").delete(); + new File(basePath + "/.hoodie/" + commitTime2 + ".commit").delete(); + new File(basePath + "/.hoodie/" + commitTime3 + ".commit").delete(); + testStreamLatestVersionInPartition(isLatestFileSliceOnly, fullPartitionPath, commitTime1, commitTime2, commitTime3, + commitTime4, fileId1, fileId2, fileId3, fileId4); + } + + private void testStreamLatestVersionInPartition(boolean isLatestFileSliceOnly, String fullPartitionPath, + String commitTime1, String commitTime2, String commitTime3, String commitTime4, String fileId1, String fileId2, + String fileId3, String fileId4) throws IOException { + // Now we list the entire partition FileStatus[] statuses = metaClient.getFs().listStatus(new Path(fullPartitionPath)); assertEquals(11, statuses.length); @@ -711,7 +730,6 @@ public class HoodieTableFileSystemViewTest { assertEquals(logFilesList.size(), 1); assertTrue(logFilesList.get(0).getFileName() .equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN))); - } @Test