From 3ae6cb4ed5820d6407663ffa74c6d201feaa8b45 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Tue, 12 Feb 2019 21:29:14 -0800 Subject: [PATCH] FileSystem View must treat same fileIds present in different partitions as different file-groups and handle pending compaction correctly --- .../cli/commands/FileSystemViewCommand.java | 2 +- .../uber/hoodie/CompactionAdminClient.java | 27 +-- .../uber/hoodie/io/HoodieAppendHandle.java | 2 +- .../com/uber/hoodie/io/HoodieCleanHelper.java | 9 +- .../hoodie/io/compact/HoodieCompactor.java | 4 +- .../compact/HoodieRealtimeTableCompactor.java | 12 +- .../hoodie/table/HoodieMergeOnReadTable.java | 2 +- .../com/uber/hoodie/TestAsyncCompaction.java | 11 +- .../java/com/uber/hoodie/TestCleaner.java | 15 +- .../hoodie/TestCompactionAdminClient.java | 2 +- .../common/model/CompactionOperation.java | 19 +- .../uber/hoodie/common/model/FileSlice.java | 23 ++- .../hoodie/common/model/HoodieFileGroup.java | 32 ++-- .../common/model/HoodieFileGroupId.java | 69 ++++++++ .../table/view/HoodieTableFileSystemView.java | 55 +++--- .../hoodie/common/util/CompactionUtils.java | 19 +- .../view/HoodieTableFileSystemViewTest.java | 166 +++++++++++++++++- .../common/util/CompactionTestUtils.java | 14 +- .../common/util/TestCompactionUtils.java | 19 +- .../utilities/HoodieCompactionAdminTool.java | 6 +- 20 files changed, 388 insertions(+), 120 deletions(-) create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroupId.java diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/FileSystemViewCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/FileSystemViewCommand.java index dd42c47e2..5d3d098a0 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/FileSystemViewCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/FileSystemViewCommand.java @@ -78,7 +78,7 @@ public class FileSystemViewCommand implements CommandMarker { // For ReadOptimized Views, do not display any delta-file related columns Comparable[] row = new Comparable[readOptimizedOnly ? 5 : 8]; row[idx++] = fg.getPartitionPath(); - row[idx++] = fg.getId(); + row[idx++] = fg.getFileGroupId().getFileId(); row[idx++] = fs.getBaseInstantTime(); row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getPath() : ""; row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java index 6a43c9c88..715bb3d4f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -25,6 +25,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -145,26 +146,28 @@ public class CompactionAdminClient implements Serializable { * * This operation MUST be executed with compactions and writer turned OFF. * - * @param fileId FileId to be unscheduled + * @param fgId FileGroupId to be unscheduled * @param skipValidation Skip validation * @param dryRun Dry Run Mode */ - public List unscheduleCompactionFileId(String fileId, + public List unscheduleCompactionFileId(HoodieFileGroupId fgId, boolean skipValidation, boolean dryRun) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); List> renameActions = - getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fileId, Optional.absent(), skipValidation); + getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId, + Optional.absent(), skipValidation); List res = runRenamingOps(metaClient, renameActions, 1, dryRun); if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) { // Ready to remove this file-Id from compaction request Pair compactionOperationWithInstant = - CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fileId); + CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fgId); HoodieCompactionPlan plan = CompactionUtils .getCompactionPlan(metaClient, compactionOperationWithInstant.getKey()); List newOps = plan.getOperations().stream() - .filter(op -> !op.getFileId().equals(fileId)).collect(Collectors.toList()); + .filter(op -> (!op.getFileId().equals(fgId.getFileId())) + && (!op.getPartitionPath().equals(fgId.getPartitionPath()))).collect(Collectors.toList()); HoodieCompactionPlan newPlan = HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build(); HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, @@ -465,23 +468,23 @@ public class CompactionAdminClient implements Serializable { * writer (ingestion/compaction) is running. * * @param metaClient Hoodie Table MetaClient - * @param fileId FileId to remove compaction + * @param fgId FileGroupId to remove compaction * @param fsViewOpt Cached File System View * @param skipValidation Skip Validation * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule * compaction. */ public List> getRenamingActionsForUnschedulingCompactionForFileId( - HoodieTableMetaClient metaClient, String fileId, Optional fsViewOpt, - boolean skipValidation) throws IOException { - Map> allPendingCompactions = + HoodieTableMetaClient metaClient, HoodieFileGroupId fgId, + Optional fsViewOpt, boolean skipValidation) throws IOException { + Map> allPendingCompactions = CompactionUtils.getAllPendingCompactionOperations(metaClient); - if (allPendingCompactions.containsKey(fileId)) { - Pair opWithInstant = allPendingCompactions.get(fileId); + if (allPendingCompactions.containsKey(fgId)) { + Pair opWithInstant = allPendingCompactions.get(fgId); return getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(), CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation); } - throw new HoodieException("FileId " + fileId + " not in pending compaction"); + throw new HoodieException("FileGroupId " + fgId + " not in pending compaction"); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 516ab5b78..69e36ee6b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -118,7 +118,7 @@ public class HoodieAppendHandle extends HoodieIOH baseInstantTime = fileSlice.get().getBaseInstantTime(); } else { // This means there is no base data file, start appending to a new log file - fileSlice = Optional.of(new FileSlice(baseInstantTime, this.fileId)); + fileSlice = Optional.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); logger.info("New InsertHandle for partition :" + partitionPath); } writeStatus.getStat().setPrevCommit(baseInstantTime); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java index 537bbedaf..aeb995547 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java @@ -21,6 +21,7 @@ import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTimeline; @@ -52,7 +53,7 @@ public class HoodieCleanHelper> { private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; - private final Map fileIdToPendingCompactionOperations; + private final Map fgIdToPendingCompactionOperations; private HoodieTable hoodieTable; private HoodieWriteConfig config; @@ -61,8 +62,8 @@ public class HoodieCleanHelper> { this.fileSystemView = hoodieTable.getCompletedFileSystemView(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); this.config = config; - this.fileIdToPendingCompactionOperations = - ((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFileIdToPendingCompaction().entrySet() + this.fgIdToPendingCompactionOperations = + ((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFgIdToPendingCompaction().entrySet() .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue())) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } @@ -249,7 +250,7 @@ public class HoodieCleanHelper> { * @return true if file slice needs to be preserved, false otherwise. */ private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) { - CompactionOperation op = fileIdToPendingCompactionOperations.get(fileSlice.getFileId()); + CompactionOperation op = fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId()); if (null != op) { // If file slice's instant time is newer or same as that of operation, do not clean return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(), diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index 27f04c426..84cadbfed 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -18,6 +18,7 @@ package com.uber.hoodie.io.compact; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; @@ -38,12 +39,13 @@ public interface HoodieCompactor extends Serializable { * @param hoodieTable Hoodie Table * @param config Hoodie Write Configuration * @param compactionCommitTime scheduled compaction commit time + * @param fgIdsInPendingCompactions partition-fileId pairs for which compaction is pending * @return Compaction Plan * @throws IOException when encountering errors */ HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, - Set fileIdsWithPendingCompactions) + Set fgIdsInPendingCompactions) throws IOException; /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 549034e3b..9848631aa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; @@ -160,7 +161,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { @Override public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, - Set fileIdsWithPendingCompactions) throws IOException { + Set fgIdsInPendingCompactions) throws IOException { totalLogFiles = new LongAccumulator(); totalFileSlices = new LongAccumulator(); @@ -190,7 +191,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) - .filter(slice -> !fileIdsWithPendingCompactions.contains(slice.getFileId())) + .filter(slice -> + !fgIdsInPendingCompactions.contains(slice.getFileGroupId())) .map( s -> { List logFiles = s.getLogFiles().sorted(HoodieLogFile @@ -215,11 +217,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // compactions only HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); - Preconditions.checkArgument(compactionPlan.getOperations().stream() - .filter(op -> fileIdsWithPendingCompactions.contains(op.getFileId())).count() == 0, + Preconditions.checkArgument(compactionPlan.getOperations().stream().noneMatch( + op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + "Please fix your strategy implementation." - + "FileIdsWithPendingCompactions :" + fileIdsWithPendingCompactions + + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 29280a49e..83c9fb64c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -147,7 +147,7 @@ public class HoodieMergeOnReadTable extends try { return compactor.generateCompactionPlan(jsc, this, config, instantTime, new HashSet<>(((HoodieTableFileSystemView)getRTFileSystemView()) - .getFileIdToPendingCompaction().keySet())); + .getFgIdToPendingCompaction().keySet())); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index 0df9da859..4684eef1e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -27,6 +27,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; @@ -398,13 +399,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase { **/ private void validateDeltaCommit(String latestDeltaCommit, - final Map> fileIdToCompactionOperation, + final Map> fgIdToCompactionOperation, HoodieWriteConfig cfg) throws IOException { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); List fileSliceList = getCurrentLatestFileSlices(table, cfg); fileSliceList.forEach(fileSlice -> { - Pair opPair = fileIdToCompactionOperation.get(fileSlice.getFileId()); + Pair opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); if (opPair != null) { System.out.println("FileSlice :" + fileSlice); assertTrue("Expect baseInstant to match compaction Instant", @@ -430,7 +431,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList()); assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); - Map> fileIdToCompactionOperation = + Map> fgIdToCompactionOperation = CompactionUtils.getAllPendingCompactionOperations(metaClient); if (insertFirst) { @@ -451,7 +452,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", dataFilesToRead.stream().findAny().isPresent()); - validateDeltaCommit(firstInstant, fileIdToCompactionOperation, cfg); + validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg); } int numRecords = records.size(); @@ -459,7 +460,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { records = dataGen.generateUpdates(instantTime, numRecords); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); - validateDeltaCommit(instantTime, fileIdToCompactionOperation, cfg); + validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); } return records; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 2ef3f7960..76c01097f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -33,6 +33,7 @@ import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; @@ -204,16 +205,14 @@ public class TestCleaner extends TestHoodieClientBase { insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); - Map selectedFileIdForCompaction = new HashMap<>(); - Map compactionFileIdToLatestFileSlice = new HashMap<>(); + Map compactionFileIdToLatestFileSlice = new HashMap<>(); HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); for (String partitionPath : dataGen.getPartitionPaths()) { TableFileSystemView fsView = table.getFileSystemView(); Optional added = fsView.getAllFileGroups(partitionPath).findFirst() .map(fg -> { - selectedFileIdForCompaction.put(fg.getId(), partitionPath); - fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getId(), fs)); + fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs)); return true; }); if (added.isPresent()) { @@ -224,7 +223,7 @@ public class TestCleaner extends TestHoodieClientBase { // Create workload with selected file-slices List> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream() - .map(e -> Pair.of(selectedFileIdForCompaction.get(e.getKey()), e.getValue())).collect(Collectors.toList()); + .map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList()); HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Optional.empty(), Optional.empty()); List instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1); @@ -270,18 +269,18 @@ public class TestCleaner extends TestHoodieClientBase { List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { - if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) { + if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) { // Ensure latest file-slice selected for compaction is retained Optional dataFileForCompactionPresent = fileGroup.getAllDataFiles().filter(df -> { - return compactionFileIdToLatestFileSlice.get(fileGroup.getId()) + return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()) .getBaseInstantTime().equals(df.getCommitTime()); }).findAny(); Assert.assertTrue("Data File selected for compaction is retained", dataFileForCompactionPresent.isPresent()); } else { // file has no more than max versions - String fileId = fileGroup.getId(); + String fileId = fileGroup.getFileGroupId().getFileId(); List dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList()); assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions", diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java index a385f0947..0ed1f7c88 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java @@ -334,7 +334,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); // Call the main unschedule API - client.unscheduleCompactionFileId(op.getFileId(), false, false); + client.unscheduleCompactionFileId(op.getFileGroupId(), false, false); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); final HoodieTableFileSystemView newFsView = diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java index b094c49c5..93a7f2e87 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/CompactionOperation.java @@ -38,8 +38,7 @@ public class CompactionOperation implements Serializable { private Optional dataFileCommitTime; private List deltaFilePaths; private Optional dataFilePath; - private String fileId; - private String partitionPath; + private HoodieFileGroupId id; private Map metrics; //Only for serialization/de-serialization @@ -52,17 +51,16 @@ public class CompactionOperation implements Serializable { if (dataFile.isPresent()) { this.baseInstantTime = dataFile.get().getCommitTime(); this.dataFilePath = Optional.of(dataFile.get().getPath()); - this.fileId = dataFile.get().getFileId(); + this.id = new HoodieFileGroupId(partitionPath, dataFile.get().getFileId()); this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime()); } else { assert logFiles.size() > 0; this.dataFilePath = Optional.absent(); this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); - this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()); + this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath())); this.dataFileCommitTime = Optional.absent(); } - this.partitionPath = partitionPath; this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()) .collect(Collectors.toList()); this.metrics = metrics; @@ -85,17 +83,21 @@ public class CompactionOperation implements Serializable { } public String getFileId() { - return fileId; + return id.getFileId(); } public String getPartitionPath() { - return partitionPath; + return id.getPartitionPath(); } public Map getMetrics() { return metrics; } + public HoodieFileGroupId getFileGroupId() { + return id; + } + /** * Convert Avro generated Compaction operation to POJO for Spark RDD operation * @param operation Hoodie Compaction Operation @@ -106,9 +108,8 @@ public class CompactionOperation implements Serializable { op.baseInstantTime = operation.getBaseInstantTime(); op.dataFilePath = Optional.fromNullable(operation.getDataFilePath()); op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); - op.fileId = operation.getFileId(); + op.id = new HoodieFileGroupId(operation.getPartitionPath(), operation.getFileId()); op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics()); - op.partitionPath = operation.getPartitionPath(); return op; } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java index 0720fc99b..30ef2dcfd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java @@ -30,9 +30,9 @@ import java.util.stream.Stream; public class FileSlice implements Serializable { /** - * id of the slice + * File Group Id of the Slice */ - private String fileId; + private HoodieFileGroupId fileGroupId; /** * Point in the timeline, at which the slice was created @@ -50,8 +50,12 @@ public class FileSlice implements Serializable { */ private final TreeSet logFiles; - public FileSlice(String baseInstantTime, String fileId) { - this.fileId = fileId; + public FileSlice(String partitionPath, String baseInstantTime, String fileId) { + this(new HoodieFileGroupId(partitionPath, fileId), baseInstantTime); + } + + public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime) { + this.fileGroupId = fileGroupId; this.baseInstantTime = baseInstantTime; this.dataFile = null; this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator()); @@ -73,8 +77,16 @@ public class FileSlice implements Serializable { return baseInstantTime; } + public String getPartitionPath() { + return fileGroupId.getPartitionPath(); + } + public String getFileId() { - return fileId; + return fileGroupId.getFileId(); + } + + public HoodieFileGroupId getFileGroupId() { + return fileGroupId; } public Optional getDataFile() { @@ -84,6 +96,7 @@ public class FileSlice implements Serializable { @Override public String toString() { final StringBuilder sb = new StringBuilder("FileSlice {"); + sb.append("fileGroupId=").append(fileGroupId); sb.append("baseCommitTime=").append(baseInstantTime); sb.append(", dataFile='").append(dataFile).append('\''); sb.append(", logFiles='").append(logFiles).append('\''); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java index 01aac4127..798a6714e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java @@ -34,14 +34,9 @@ public class HoodieFileGroup implements Serializable { } /** - * Partition containing the file group. + * file group id */ - private final String partitionPath; - - /** - * uniquely identifies the file group - */ - private final String id; + private final HoodieFileGroupId fileGroupId; /** * Slices of files in this group, sorted with greater commit first. @@ -59,8 +54,11 @@ public class HoodieFileGroup implements Serializable { private final Optional lastInstant; public HoodieFileGroup(String partitionPath, String id, HoodieTimeline timeline) { - this.partitionPath = partitionPath; - this.id = id; + this(new HoodieFileGroupId(partitionPath, id), timeline); + } + + public HoodieFileGroup(HoodieFileGroupId fileGroupId, HoodieTimeline timeline) { + this.fileGroupId = fileGroupId; this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator()); this.timeline = timeline; this.lastInstant = timeline.lastInstant(); @@ -72,7 +70,7 @@ public class HoodieFileGroup implements Serializable { */ public void addNewFileSliceAtInstant(String baseInstantTime) { if (!fileSlices.containsKey(baseInstantTime)) { - fileSlices.put(baseInstantTime, new FileSlice(baseInstantTime, id)); + fileSlices.put(baseInstantTime, new FileSlice(fileGroupId, baseInstantTime)); } } @@ -81,7 +79,7 @@ public class HoodieFileGroup implements Serializable { */ public void addDataFile(HoodieDataFile dataFile) { if (!fileSlices.containsKey(dataFile.getCommitTime())) { - fileSlices.put(dataFile.getCommitTime(), new FileSlice(dataFile.getCommitTime(), id)); + fileSlices.put(dataFile.getCommitTime(), new FileSlice(fileGroupId, dataFile.getCommitTime())); } fileSlices.get(dataFile.getCommitTime()).setDataFile(dataFile); } @@ -91,17 +89,17 @@ public class HoodieFileGroup implements Serializable { */ public void addLogFile(HoodieLogFile logFile) { if (!fileSlices.containsKey(logFile.getBaseCommitTime())) { - fileSlices.put(logFile.getBaseCommitTime(), new FileSlice(logFile.getBaseCommitTime(), id)); + fileSlices.put(logFile.getBaseCommitTime(), new FileSlice(fileGroupId, logFile.getBaseCommitTime())); } fileSlices.get(logFile.getBaseCommitTime()).addLogFile(logFile); } - public String getId() { - return id; + public String getPartitionPath() { + return fileGroupId.getPartitionPath(); } - public String getPartitionPath() { - return partitionPath; + public HoodieFileGroupId getFileGroupId() { + return fileGroupId; } /** @@ -197,7 +195,7 @@ public class HoodieFileGroup implements Serializable { @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieFileGroup {"); - sb.append("id=").append(id); + sb.append("id=").append(fileGroupId); sb.append(", fileSlices='").append(fileSlices).append('\''); sb.append('}'); return sb.toString(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroupId.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroupId.java new file mode 100644 index 000000000..98f51b439 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroupId.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Unique ID to identify a file-group in a data-set + */ +public class HoodieFileGroupId implements Serializable { + + private final String partitionPath; + + private final String fileId; + + public HoodieFileGroupId(String partitionPath, String fileId) { + this.partitionPath = partitionPath; + this.fileId = fileId; + } + + public String getPartitionPath() { + return partitionPath; + } + + public String getFileId() { + return fileId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HoodieFileGroupId that = (HoodieFileGroupId) o; + return Objects.equals(partitionPath, that.partitionPath) + && Objects.equals(fileId, that.fileId); + } + + @Override + public int hashCode() { + return Objects.hash(partitionPath, fileId); + } + + @Override + public String toString() { + return "HoodieFileGroupId{" + + "partitionPath='" + partitionPath + '\'' + + ", fileId='" + fileId + '\'' + + '}'; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index cf5cb9eec..93f61b7c3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -21,6 +21,7 @@ import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -70,12 +71,12 @@ public class HoodieTableFileSystemView implements TableFileSystemView, // mapping from partition paths to file groups contained within them protected HashMap> partitionToFileGroupsMap; // mapping from file id to the file group. - protected HashMap fileGroupMap; + protected HashMap fileGroupMap; /** - * File Id to pending compaction instant time + * PartitionPath + File-Id to pending compaction instant time */ - private final Map> fileIdToPendingCompaction; + private final Map> fgIdToPendingCompaction; /** * Create a file system view, as of the given timeline @@ -90,7 +91,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, // Build fileId to Pending Compaction Instants List pendingCompactionInstants = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); - this.fileIdToPendingCompaction = ImmutableMap.copyOf( + this.fgIdToPendingCompaction = ImmutableMap.copyOf( CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream() .map(entry -> Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue())))) @@ -123,6 +124,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView, out.defaultWriteObject(); } + private String getPartitionPathFromFileStatus(FileStatus fileStatus) { + return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), fileStatus.getPath().getParent()); + } + /** * Adds the provided statuses into the file system view, and also caches it inside this object. */ @@ -130,9 +135,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Map, List> dataFiles = convertFileStatusesToDataFiles( statuses) .collect(Collectors.groupingBy((dataFile) -> { - String partitionPathStr = FSUtils.getRelativePartitionPath( - new Path(metaClient.getBasePath()), - dataFile.getFileStatus().getPath().getParent()); + String partitionPathStr = getPartitionPathFromFileStatus(dataFile.getFileStatus()); return Pair.of(partitionPathStr, dataFile.getFileId()); })); Map, List> logFiles = convertFileStatusesToLogFiles( @@ -157,17 +160,18 @@ public class HoodieTableFileSystemView implements TableFileSystemView, if (logFiles.containsKey(pair)) { logFiles.get(pair).forEach(group::addLogFile); } - if (fileIdToPendingCompaction.containsKey(fileId)) { + HoodieFileGroupId fgId = group.getFileGroupId(); + if (fgIdToPendingCompaction.containsKey(fgId)) { // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears // so that any new ingestion uses the correct base-instant - group.addNewFileSliceAtInstant(fileIdToPendingCompaction.get(fileId).getKey()); + group.addNewFileSliceAtInstant(fgIdToPendingCompaction.get(fgId).getKey()); } fileGroups.add(group); }); // add to the cache. fileGroups.forEach(group -> { - fileGroupMap.put(group.getId(), group); + fileGroupMap.put(group.getFileGroupId(), group); if (!partitionToFileGroupsMap.containsKey(group.getPartitionPath())) { partitionToFileGroupsMap.put(group.getPartitionPath(), new ArrayList<>()); } @@ -198,7 +202,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, * @param dataFile Data File */ private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) { - Pair compactionWithInstantTime = fileIdToPendingCompaction.get(dataFile.getFileId()); + final String partitionPath = getPartitionPathFromFileStatus(dataFile.getFileStatus()); + HoodieFileGroupId fgId = new HoodieFileGroupId(partitionPath, dataFile.getFileId()); + Pair compactionWithInstantTime = fgIdToPendingCompaction.get(fgId); if ((null != compactionWithInstantTime) && (null != compactionWithInstantTime.getLeft()) && dataFile.getCommitTime().equals(compactionWithInstantTime.getKey())) { return true; @@ -210,7 +216,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, public Stream getLatestDataFiles(final String partitionPath) { return getAllFileGroups(partitionPath) .map(fileGroup -> { - return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); + return fileGroup.getAllDataFiles() + .filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); }) .filter(Optional::isPresent) .map(Optional::get); @@ -278,7 +285,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, .map(HoodieFileGroup::getLatestFileSlice) .filter(Optional::isPresent) .map(Optional::get) - .map(this::filterDataFileAfterPendingCompaction); + .map(fs -> filterDataFileAfterPendingCompaction(fs)); } @Override @@ -288,7 +295,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, FileSlice fileSlice = fileGroup.getLatestFileSlice().get(); // if the file-group is under compaction, pick the latest before compaction instant time. if (isFileSliceAfterPendingCompaction(fileSlice)) { - String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getLeft(); + String compactionInstantTime = fgIdToPendingCompaction.get(fileSlice.getFileGroupId()).getLeft(); return fileGroup.getLatestFileSliceBefore(compactionInstantTime); } return Optional.of(fileSlice); @@ -303,7 +310,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, * @return */ private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { - Pair compactionWithInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()); + Pair compactionWithInstantTime = + fgIdToPendingCompaction.get(fileSlice.getFileGroupId()); return (null != compactionWithInstantTime) && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey()); } @@ -318,7 +326,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, if (isFileSliceAfterPendingCompaction(fileSlice)) { // Data file is filtered out of the file-slice as the corresponding compaction // instant not completed yet. - FileSlice transformed = new FileSlice(fileSlice.getBaseInstantTime(), fileSlice.getFileId()); + FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), + fileSlice.getBaseInstantTime(), fileSlice.getFileId()); fileSlice.getLogFiles().forEach(transformed::addLogFile); return transformed; } @@ -332,7 +341,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)) .filter(Optional::isPresent) .map(Optional::get) - .map(this::filterDataFileAfterPendingCompaction); + .map(fs -> filterDataFileAfterPendingCompaction(fs)); } /** @@ -342,7 +351,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, * @param penultimateSlice Penultimate file slice for a file-group in commit timeline order */ private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) { - FileSlice merged = new FileSlice(penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId()); + FileSlice merged = new FileSlice(penultimateSlice.getPartitionPath(), + penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId()); if (penultimateSlice.getDataFile().isPresent()) { merged.setDataFile(penultimateSlice.getDataFile().get()); } @@ -361,8 +371,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView, */ private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) { // if the file-group is under construction, pick the latest before compaction instant time. - if (fileIdToPendingCompaction.containsKey(fileSlice.getFileId())) { - String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getKey(); + HoodieFileGroupId fgId = fileSlice.getFileGroupId(); + if (fgIdToPendingCompaction.containsKey(fgId)) { + String compactionInstantTime = fgIdToPendingCompaction.get(fgId).getKey(); if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { Optional prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime); if (prevFileSlice.isPresent()) { @@ -426,8 +437,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, } } - public Map> getFileIdToPendingCompaction() { - return fileIdToPendingCompaction; + public Map> getFgIdToPendingCompaction() { + return fgIdToPendingCompaction; } public Stream getAllFileGroups() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java index dec7244db..f3552b452 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -20,6 +20,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -134,36 +135,38 @@ public class CompactionUtils { } /** - * Get all file-ids with pending Compaction operations and their target compaction instant time + * Get all PartitionPath + file-ids with pending Compaction operations and their target compaction instant time * * @param metaClient Hoodie Table Meta Client */ - public static Map> getAllPendingCompactionOperations( + public static Map> getAllPendingCompactionOperations( HoodieTableMetaClient metaClient) { List> pendingCompactionPlanWithInstants = getAllPendingCompactionPlans(metaClient); - Map> fileIdToPendingCompactionWithInstantMap = new HashMap<>(); + Map> fgIdToPendingCompactionWithInstantMap = + new HashMap<>(); pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> { HoodieInstant instant = instantPlanPair.getKey(); HoodieCompactionPlan compactionPlan = instantPlanPair.getValue(); List ops = compactionPlan.getOperations(); if (null != ops) { return ops.stream().map(op -> { - return Pair.of(op.getFileId(), Pair.of(instant.getTimestamp(), op)); + return Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), + Pair.of(instant.getTimestamp(), op)); }); } else { return Stream.empty(); } }).forEach(pair -> { // Defensive check to ensure a single-fileId does not have more than one pending compaction - if (fileIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { + if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { String msg = "Hoodie File Id (" + pair.getKey() + ") has more thant 1 pending compactions. Instants: " - + pair.getValue() + ", " + fileIdToPendingCompactionWithInstantMap.get(pair.getKey()); + + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); throw new IllegalStateException(msg); } - fileIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); + fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); }); - return fileIdToPendingCompactionWithInstantMap; + return fgIdToPendingCompactionWithInstantMap; } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index b650695f8..bfd931092 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -51,6 +52,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -676,7 +678,7 @@ public class HoodieTableFileSystemViewTest { assertEquals(3, fileGroups.size()); for (HoodieFileGroup fileGroup : fileGroups) { - String fileId = fileGroup.getId(); + String fileId = fileGroup.getFileGroupId().getFileId(); Set filenames = Sets.newHashSet(); fileGroup.getAllDataFiles().forEach(dataFile -> { assertEquals("All same fileId should be grouped", fileId, dataFile.getFileId()); @@ -850,16 +852,17 @@ public class HoodieTableFileSystemViewTest { assertEquals(3, fileGroups.size()); for (HoodieFileGroup fileGroup : fileGroups) { List slices = fileGroup.getAllFileSlices().collect(Collectors.toList()); - if (fileGroup.getId().equals(fileId1)) { + String fileId = fileGroup.getFileGroupId().getFileId(); + if (fileId.equals(fileId1)) { assertEquals(2, slices.size()); assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); assertEquals(commitTime1, slices.get(1).getBaseInstantTime()); - } else if (fileGroup.getId().equals(fileId2)) { + } else if (fileId.equals(fileId2)) { assertEquals(3, slices.size()); assertEquals(commitTime3, slices.get(0).getBaseInstantTime()); assertEquals(commitTime2, slices.get(1).getBaseInstantTime()); assertEquals(commitTime1, slices.get(2).getBaseInstantTime()); - } else if (fileGroup.getId().equals(fileId3)) { + } else if (fileId.equals(fileId3)) { assertEquals(2, slices.size()); assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); assertEquals(commitTime3, slices.get(1).getBaseInstantTime()); @@ -876,4 +879,159 @@ public class HoodieTableFileSystemViewTest { assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3))); } + + @Test + public void testPendingCompactionWithDuplicateFileIdsAcrossPartitions() throws Exception { + // Put some files in the partition + String partitionPath1 = "2016/05/01"; + String partitionPath2 = "2016/05/02"; + String partitionPath3 = "2016/05/03"; + + String fullPartitionPath1 = basePath + "/" + partitionPath1 + "/"; + new File(fullPartitionPath1).mkdirs(); + String fullPartitionPath2 = basePath + "/" + partitionPath2 + "/"; + new File(fullPartitionPath2).mkdirs(); + String fullPartitionPath3 = basePath + "/" + partitionPath3 + "/"; + new File(fullPartitionPath3).mkdirs(); + String instantTime1 = "1"; + String deltaInstantTime1 = "2"; + String deltaInstantTime2 = "3"; + String fileId = UUID.randomUUID().toString(); + + String dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId); + new File(fullPartitionPath1 + dataFileName).createNewFile(); + + String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0); + new File(fullPartitionPath1 + fileName1) + .createNewFile(); + new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile(); + new File(fullPartitionPath2 + fileName1) + .createNewFile(); + new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile(); + new File(fullPartitionPath3 + fileName1) + .createNewFile(); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1); + HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2); + + commitTimeline.saveAsComplete(instant1, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant2, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant3, Optional.empty()); + + // Now we list all partitions + FileStatus[] statuses = metaClient.getFs().listStatus(new Path[] { + new Path(fullPartitionPath1), new Path(fullPartitionPath2), new Path(fullPartitionPath3) + }); + assertEquals(6, statuses.length); + refreshFsView(statuses); + + List groups = fsView.getAllFileGroups().collect(Collectors.toList()); + Assert.assertEquals("Expected number of file-groups", 3, groups.size()); + Assert.assertEquals("Partitions must be different for file-groups", 3, + groups.stream().map(HoodieFileGroup::getPartitionPath).collect(Collectors.toSet()).size()); + Set fileIds = groups.stream().map(HoodieFileGroup::getFileGroupId) + .map(HoodieFileGroupId::getFileId).collect(Collectors.toSet()); + Assert.assertEquals("File Id must be same", 1, fileIds.size()); + Assert.assertTrue("Expected FileId", fileIds.contains(fileId)); + + // Setup Pending compaction for all of these fileIds. + List> partitionFileSlicesPairs = new ArrayList<>(); + List fileSlices = rtView.getLatestFileSlices(partitionPath1).collect(Collectors.toList()); + partitionFileSlicesPairs.add(Pair.of(partitionPath1, fileSlices.get(0))); + fileSlices = rtView.getLatestFileSlices(partitionPath2).collect(Collectors.toList()); + partitionFileSlicesPairs.add(Pair.of(partitionPath2, fileSlices.get(0))); + fileSlices = rtView.getLatestFileSlices(partitionPath3).collect(Collectors.toList()); + partitionFileSlicesPairs.add(Pair.of(partitionPath3, fileSlices.get(0))); + + String compactionRequestedTime = "2"; + String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId); + HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs, + Optional.empty(), Optional.empty()); + + // Create a Data-file for some of the partitions but this should be skipped by view + new File(basePath + "/" + partitionPath1 + "/" + compactDataFileName).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + compactDataFileName).createNewFile(); + + HoodieInstant compactionInstant = + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); + HoodieInstant requested = HoodieTimeline.getCompactionRequestedInstant(compactionInstant.getTimestamp()); + metaClient.getActiveTimeline().saveToCompactionRequested(requested, + AvroUtils.serializeCompactionPlan(compactionPlan)); + metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requested); + + // Fake delta-ingestion after compaction-requested + String deltaInstantTime4 = "3"; + String deltaInstantTime5 = "6"; + List allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2, + compactionRequestedTime, deltaInstantTime4, deltaInstantTime5); + String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0); + String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1); + new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName4).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile(); + new File(basePath + "/" + partitionPath3 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath3 + "/" + fileName4).createNewFile(); + + HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4); + HoodieInstant deltaInstant5 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime5); + commitTimeline.saveAsComplete(deltaInstant4, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant5, Optional.empty()); + refreshFsView(null); + + // Test Data Files + List dataFiles = roView.getAllDataFiles(partitionPath1).collect(Collectors.toList()); + assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size()); + assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime()); + dataFiles = roView.getAllDataFiles(partitionPath2).collect(Collectors.toList()); + assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size()); + assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime()); + + /** Merge API Tests **/ + Arrays.asList(partitionPath1, partitionPath2, partitionPath3).stream().forEach(partitionPath -> { + List fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + .collect(Collectors.toList()); + assertEquals("Expect file-slice to be merged", 1, fileSliceList.size()); + FileSlice fileSlice = fileSliceList.get(0); + assertEquals(fileId, fileSlice.getFileId()); + assertEquals("Data file must be present", dataFileName, fileSlice.getDataFile().get().getFileName()); + assertEquals("Base Instant of penultimate file-slice must be base instant", instantTime1, + fileSlice.getBaseInstantTime()); + List logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Log files must include those after compaction request", 3, logFiles.size()); + assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + assertEquals("Log File Order check", fileName1, logFiles.get(2).getFileName()); + + fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + .collect(Collectors.toList()); + assertEquals("Expect only one file-id", 1, fileSliceList.size()); + fileSlice = fileSliceList.get(0); + assertEquals(fileId, fileSlice.getFileId()); + assertFalse("No data-file expected in latest file-slice", fileSlice.getDataFile().isPresent()); + assertEquals("Compaction requested instant must be base instant", compactionRequestedTime, + fileSlice.getBaseInstantTime()); + logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Log files must include only those after compaction request", 2, logFiles.size()); + assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + }); + + Assert.assertEquals(3, fsView.getFgIdToPendingCompaction().size()); + Set partitionsInCompaction = + fsView.getFgIdToPendingCompaction().keySet().stream().map(HoodieFileGroupId::getPartitionPath) + .collect(Collectors.toSet()); + Assert.assertEquals(3, partitionsInCompaction.size()); + Assert.assertTrue(partitionsInCompaction.contains(partitionPath1)); + Assert.assertTrue(partitionsInCompaction.contains(partitionPath2)); + Assert.assertTrue(partitionsInCompaction.contains(partitionPath3)); + + Set fileIdsInCompaction = + fsView.getFgIdToPendingCompaction().keySet().stream().map(HoodieFileGroupId::getFileId) + .collect(Collectors.toSet()); + Assert.assertEquals(1, fileIdsInCompaction.size()); + Assert.assertTrue(fileIdsInCompaction.contains(fileId)); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java index 5fae7a3b3..e2718c47e 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java @@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -48,7 +49,7 @@ import org.junit.Assert; public class CompactionTestUtils { - public static Map> setupAndValidateCompactionOperations( + public static Map> setupAndValidateCompactionOperations( HoodieTableMetaClient metaClient, boolean inflight, int numEntriesInPlan1, int numEntriesInPlan2, int numEntriesInPlan3, int numEntriesInPlan4) throws IOException { @@ -91,10 +92,10 @@ public class CompactionTestUtils { }); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true); - Map> pendingCompactionMap = + Map> pendingCompactionMap = CompactionUtils.getAllPendingCompactionOperations(metaClient); - Map> expPendingCompactionMap = + Map> expPendingCompactionMap = generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4), baseInstantsToCompaction); // Ensure Compaction operations are fine. @@ -102,12 +103,13 @@ public class CompactionTestUtils { return expPendingCompactionMap; } - public static Map> generateExpectedCompactionOperations( + public static Map> generateExpectedCompactionOperations( List plans, Map baseInstantsToCompaction) { return plans.stream() .flatMap(plan -> { if (plan.getOperations() != null) { - return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(), + return plan.getOperations().stream().map(op -> Pair.of( + new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), Pair.of(baseInstantsToCompaction.get(op.getBaseInstantTime()), op))); } return Stream.empty(); @@ -146,7 +148,7 @@ public class CompactionTestUtils { instantId, fileId, Optional.of(1)); HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId, Optional.of(2)); - FileSlice slice = new FileSlice(instantId, fileId); + FileSlice slice = new FileSlice(DEFAULT_PARTITION_PATHS[0], instantId, fileId); if (createDataFile) { slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0] + "/" + FSUtils.makeDataFileName(instantId, 1, fileId))); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java index c65216260..90c183249 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; @@ -69,20 +70,20 @@ public class TestCompactionUtils { @Test public void testBuildFromFileSlice() { // Empty File-Slice with no data and log files - FileSlice emptyFileSlice = new FileSlice("000", "empty1"); + FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "empty1"); HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice( DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn)); testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]); // File Slice with data-file but no log files - FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); + FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1"); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); op = CompactionUtils.buildFromFileSlice( DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn)); testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]); //File Slice with no data-file but log files present - FileSlice noDataFileSlice = new FileSlice("000", "noData1"); + FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1"); noDataFileSlice.addLogFile(new HoodieLogFile(new Path( FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); noDataFileSlice.addLogFile(new HoodieLogFile(new Path( @@ -92,7 +93,7 @@ public class TestCompactionUtils { testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]); //File Slice with data-file and log files present - FileSlice fileSlice = new FileSlice("000", "noData1"); + FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1"); fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); fileSlice.addLogFile(new HoodieLogFile(new Path( FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); @@ -107,16 +108,16 @@ public class TestCompactionUtils { * Generate input for compaction plan tests */ private Pair>, HoodieCompactionPlan> buildCompactionPlan() { - FileSlice emptyFileSlice = new FileSlice("000", "empty1"); - FileSlice fileSlice = new FileSlice("000", "noData1"); + FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "empty1"); + FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1"); fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); fileSlice.addLogFile(new HoodieLogFile(new Path( FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); fileSlice.addLogFile(new HoodieLogFile(new Path( FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); - FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); + FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1"); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); - FileSlice noDataFileSlice = new FileSlice("000", "noData1"); + FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1"); noDataFileSlice.addLogFile(new HoodieLogFile(new Path( FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); noDataFileSlice.addLogFile(new HoodieLogFile(new Path( @@ -161,7 +162,7 @@ public class TestCompactionUtils { // schedule same plan again so that there will be duplicates scheduleCompaction(metaClient, "005", plan1); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); - Map> res = + Map> res = CompactionUtils.getAllPendingCompactionOperations(metaClient); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java index 0a225e91a..da464c682 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java @@ -5,6 +5,7 @@ import com.beust.jcommander.Parameter; import com.uber.hoodie.CompactionAdminClient; import com.uber.hoodie.CompactionAdminClient.RenameOpResult; import com.uber.hoodie.CompactionAdminClient.ValidationOpResult; +import com.uber.hoodie.common.model.HoodieFileGroupId; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import java.io.ObjectOutputStream; @@ -60,7 +61,8 @@ public class HoodieCompactionAdminTool { break; case UNSCHEDULE_FILE: List r = - admin.unscheduleCompactionFileId(cfg.fileId, cfg.skipValidation, cfg.dryRun); + admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), + cfg.skipValidation, cfg.dryRun); if (cfg.printOutput) { System.out.println(r); } @@ -132,6 +134,8 @@ public class HoodieCompactionAdminTool { public String basePath = null; @Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false) public String compactionInstantTime = null; + @Parameter(names = {"--partition-path", "-pp"}, description = "Partition Path", required = false) + public String partitionPath = null; @Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false) public String fileId = null; @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)