1
0

FileSystem View must treat same fileIds present in different partitions as different file-groups and handle pending compaction correctly

This commit is contained in:
Balaji Varadarajan
2019-02-12 21:29:14 -08:00
committed by vinoth chandar
parent 363df2c12e
commit 3ae6cb4ed5
20 changed files with 388 additions and 120 deletions

View File

@@ -78,7 +78,7 @@ public class FileSystemViewCommand implements CommandMarker {
// For ReadOptimized Views, do not display any delta-file related columns // For ReadOptimized Views, do not display any delta-file related columns
Comparable[] row = new Comparable[readOptimizedOnly ? 5 : 8]; Comparable[] row = new Comparable[readOptimizedOnly ? 5 : 8];
row[idx++] = fg.getPartitionPath(); row[idx++] = fg.getPartitionPath();
row[idx++] = fg.getId(); row[idx++] = fg.getFileGroupId().getFileId();
row[idx++] = fs.getBaseInstantTime(); row[idx++] = fs.getBaseInstantTime();
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getPath() : ""; row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getPath() : "";
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1; row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;

View File

@@ -25,6 +25,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
@@ -145,26 +146,28 @@ public class CompactionAdminClient implements Serializable {
* *
* This operation MUST be executed with compactions and writer turned OFF. * This operation MUST be executed with compactions and writer turned OFF.
* *
* @param fileId FileId to be unscheduled * @param fgId FileGroupId to be unscheduled
* @param skipValidation Skip validation * @param skipValidation Skip validation
* @param dryRun Dry Run Mode * @param dryRun Dry Run Mode
*/ */
public List<RenameOpResult> unscheduleCompactionFileId(String fileId, public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId,
boolean skipValidation, boolean dryRun) throws Exception { boolean skipValidation, boolean dryRun) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions = List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fileId, Optional.absent(), skipValidation); getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId,
Optional.absent(), skipValidation);
List<RenameOpResult> res = runRenamingOps(metaClient, renameActions, 1, dryRun); List<RenameOpResult> res = runRenamingOps(metaClient, renameActions, 1, dryRun);
if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) { if (!dryRun && !res.isEmpty() && res.get(0).isExecuted() && res.get(0).isSuccess()) {
// Ready to remove this file-Id from compaction request // Ready to remove this file-Id from compaction request
Pair<String, HoodieCompactionOperation> compactionOperationWithInstant = Pair<String, HoodieCompactionOperation> compactionOperationWithInstant =
CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fileId); CompactionUtils.getAllPendingCompactionOperations(metaClient).get(fgId);
HoodieCompactionPlan plan = CompactionUtils HoodieCompactionPlan plan = CompactionUtils
.getCompactionPlan(metaClient, compactionOperationWithInstant.getKey()); .getCompactionPlan(metaClient, compactionOperationWithInstant.getKey());
List<HoodieCompactionOperation> newOps = plan.getOperations().stream() List<HoodieCompactionOperation> newOps = plan.getOperations().stream()
.filter(op -> !op.getFileId().equals(fileId)).collect(Collectors.toList()); .filter(op -> (!op.getFileId().equals(fgId.getFileId()))
&& (!op.getPartitionPath().equals(fgId.getPartitionPath()))).collect(Collectors.toList());
HoodieCompactionPlan newPlan = HoodieCompactionPlan newPlan =
HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build(); HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION,
@@ -465,23 +468,23 @@ public class CompactionAdminClient implements Serializable {
* writer (ingestion/compaction) is running. * writer (ingestion/compaction) is running.
* *
* @param metaClient Hoodie Table MetaClient * @param metaClient Hoodie Table MetaClient
* @param fileId FileId to remove compaction * @param fgId FileGroupId to remove compaction
* @param fsViewOpt Cached File System View * @param fsViewOpt Cached File System View
* @param skipValidation Skip Validation * @param skipValidation Skip Validation
* @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule
* compaction. * compaction.
*/ */
public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionForFileId( public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionForFileId(
HoodieTableMetaClient metaClient, String fileId, Optional<HoodieTableFileSystemView> fsViewOpt, HoodieTableMetaClient metaClient, HoodieFileGroupId fgId,
boolean skipValidation) throws IOException { Optional<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
Map<String, Pair<String, HoodieCompactionOperation>> allPendingCompactions = Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> allPendingCompactions =
CompactionUtils.getAllPendingCompactionOperations(metaClient); CompactionUtils.getAllPendingCompactionOperations(metaClient);
if (allPendingCompactions.containsKey(fileId)) { if (allPendingCompactions.containsKey(fgId)) {
Pair<String, HoodieCompactionOperation> opWithInstant = allPendingCompactions.get(fileId); Pair<String, HoodieCompactionOperation> opWithInstant = allPendingCompactions.get(fgId);
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(), return getRenamingActionsForUnschedulingCompactionOperation(metaClient, opWithInstant.getKey(),
CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation); CompactionOperation.convertFromAvroRecordInstance(opWithInstant.getValue()), fsViewOpt, skipValidation);
} }
throw new HoodieException("FileId " + fileId + " not in pending compaction"); throw new HoodieException("FileGroupId " + fgId + " not in pending compaction");
} }
/** /**

View File

@@ -118,7 +118,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
baseInstantTime = fileSlice.get().getBaseInstantTime(); baseInstantTime = fileSlice.get().getBaseInstantTime();
} else { } else {
// This means there is no base data file, start appending to a new log file // This means there is no base data file, start appending to a new log file
fileSlice = Optional.of(new FileSlice(baseInstantTime, this.fileId)); fileSlice = Optional.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
logger.info("New InsertHandle for partition :" + partitionPath); logger.info("New InsertHandle for partition :" + partitionPath);
} }
writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.getStat().setPrevCommit(baseInstantTime);

View File

@@ -21,6 +21,7 @@ import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
@@ -52,7 +53,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
private final TableFileSystemView fileSystemView; private final TableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline; private final HoodieTimeline commitTimeline;
private final Map<String, CompactionOperation> fileIdToPendingCompactionOperations; private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
private HoodieTable<T> hoodieTable; private HoodieTable<T> hoodieTable;
private HoodieWriteConfig config; private HoodieWriteConfig config;
@@ -61,8 +62,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
this.fileSystemView = hoodieTable.getCompletedFileSystemView(); this.fileSystemView = hoodieTable.getCompletedFileSystemView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
this.config = config; this.config = config;
this.fileIdToPendingCompactionOperations = this.fgIdToPendingCompactionOperations =
((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFileIdToPendingCompaction().entrySet() ((HoodieTableFileSystemView)hoodieTable.getRTFileSystemView()).getFgIdToPendingCompaction().entrySet()
.stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue())) .stream().map(entry -> Pair.of(entry.getKey(), entry.getValue().getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
} }
@@ -249,7 +250,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
* @return true if file slice needs to be preserved, false otherwise. * @return true if file slice needs to be preserved, false otherwise.
*/ */
private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) { private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
CompactionOperation op = fileIdToPendingCompactionOperations.get(fileSlice.getFileId()); CompactionOperation op = fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId());
if (null != op) { if (null != op) {
// If file slice's instant time is newer or same as that of operation, do not clean // If file slice's instant time is newer or same as that of operation, do not clean
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(), return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(),

View File

@@ -18,6 +18,7 @@ package com.uber.hoodie.io.compact;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.IOException; import java.io.IOException;
@@ -38,12 +39,13 @@ public interface HoodieCompactor extends Serializable {
* @param hoodieTable Hoodie Table * @param hoodieTable Hoodie Table
* @param config Hoodie Write Configuration * @param config Hoodie Write Configuration
* @param compactionCommitTime scheduled compaction commit time * @param compactionCommitTime scheduled compaction commit time
* @param fgIdsInPendingCompactions partition-fileId pairs for which compaction is pending
* @return Compaction Plan * @return Compaction Plan
* @throws IOException when encountering errors * @throws IOException when encountering errors
*/ */
HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc,
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime,
Set<String> fileIdsWithPendingCompactions) Set<HoodieFileGroupId> fgIdsInPendingCompactions)
throws IOException; throws IOException;
/** /**

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
@@ -160,7 +161,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
@Override @Override
public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc,
HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime,
Set<String> fileIdsWithPendingCompactions) throws IOException { Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException {
totalLogFiles = new LongAccumulator(); totalLogFiles = new LongAccumulator();
totalFileSlices = new LongAccumulator(); totalFileSlices = new LongAccumulator();
@@ -190,7 +191,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
jsc.parallelize(partitionPaths, partitionPaths.size()) jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)
.filter(slice -> !fileIdsWithPendingCompactions.contains(slice.getFileId())) .filter(slice ->
!fgIdsInPendingCompactions.contains(slice.getFileGroupId()))
.map( .map(
s -> { s -> {
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
@@ -215,11 +217,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// compactions only // compactions only
HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
Preconditions.checkArgument(compactionPlan.getOperations().stream() Preconditions.checkArgument(compactionPlan.getOperations().stream().noneMatch(
.filter(op -> fileIdsWithPendingCompactions.contains(op.getFileId())).count() == 0, op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation." + "Please fix your strategy implementation."
+ "FileIdsWithPendingCompactions :" + fileIdsWithPendingCompactions + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions
+ ", Selected workload :" + compactionPlan); + ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) { if (compactionPlan.getOperations().isEmpty()) {
log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());

View File

@@ -147,7 +147,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
try { try {
return compactor.generateCompactionPlan(jsc, this, config, instantTime, return compactor.generateCompactionPlan(jsc, this, config, instantTime,
new HashSet<>(((HoodieTableFileSystemView)getRTFileSystemView()) new HashSet<>(((HoodieTableFileSystemView)getRTFileSystemView())
.getFileIdToPendingCompaction().keySet())); .getFgIdToPendingCompaction().keySet()));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
} }

View File

@@ -27,6 +27,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
@@ -398,13 +399,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
**/ **/
private void validateDeltaCommit(String latestDeltaCommit, private void validateDeltaCommit(String latestDeltaCommit,
final Map<String, Pair<String, HoodieCompactionOperation>> fileIdToCompactionOperation, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
HoodieWriteConfig cfg) throws IOException { HoodieWriteConfig cfg) throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg); List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
fileSliceList.forEach(fileSlice -> { fileSliceList.forEach(fileSlice -> {
Pair<String, HoodieCompactionOperation> opPair = fileIdToCompactionOperation.get(fileSlice.getFileId()); Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
if (opPair != null) { if (opPair != null) {
System.out.println("FileSlice :" + fileSlice); System.out.println("FileSlice :" + fileSlice);
assertTrue("Expect baseInstant to match compaction Instant", assertTrue("Expect baseInstant to match compaction Instant",
@@ -430,7 +431,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList()); pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList());
assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
Map<String, Pair<String, HoodieCompactionOperation>> fileIdToCompactionOperation = Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation =
CompactionUtils.getAllPendingCompactionOperations(metaClient); CompactionUtils.getAllPendingCompactionOperations(metaClient);
if (insertFirst) { if (insertFirst) {
@@ -451,7 +452,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
List<HoodieDataFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); List<HoodieDataFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.stream().findAny().isPresent()); dataFilesToRead.stream().findAny().isPresent());
validateDeltaCommit(firstInstant, fileIdToCompactionOperation, cfg); validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
} }
int numRecords = records.size(); int numRecords = records.size();
@@ -459,7 +460,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
records = dataGen.generateUpdates(instantTime, numRecords); records = dataGen.generateUpdates(instantTime, numRecords);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
validateDeltaCommit(instantTime, fileIdToCompactionOperation, cfg); validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
} }
return records; return records;
} }

View File

@@ -33,6 +33,7 @@ import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
@@ -204,16 +205,14 @@ public class TestCleaner extends TestHoodieClientBase {
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn); insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
Map<String, String> selectedFileIdForCompaction = new HashMap<>(); Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
Map<String, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc); HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView(); TableFileSystemView fsView = table.getFileSystemView();
Optional<Boolean> added = fsView.getAllFileGroups(partitionPath).findFirst() Optional<Boolean> added = fsView.getAllFileGroups(partitionPath).findFirst()
.map(fg -> { .map(fg -> {
selectedFileIdForCompaction.put(fg.getId(), partitionPath); fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getId(), fs));
return true; return true;
}); });
if (added.isPresent()) { if (added.isPresent()) {
@@ -224,7 +223,7 @@ public class TestCleaner extends TestHoodieClientBase {
// Create workload with selected file-slices // Create workload with selected file-slices
List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream() List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream()
.map(e -> Pair.of(selectedFileIdForCompaction.get(e.getKey()), e.getValue())).collect(Collectors.toList()); .map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList());
HoodieCompactionPlan compactionPlan = HoodieCompactionPlan compactionPlan =
CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Optional.empty(), Optional.empty()); CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Optional.empty(), Optional.empty());
List<String> instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1); List<String> instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1);
@@ -270,18 +269,18 @@ public class TestCleaner extends TestHoodieClientBase {
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
if (selectedFileIdForCompaction.containsKey(fileGroup.getId())) { if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
// Ensure latest file-slice selected for compaction is retained // Ensure latest file-slice selected for compaction is retained
Optional<HoodieDataFile> dataFileForCompactionPresent = Optional<HoodieDataFile> dataFileForCompactionPresent =
fileGroup.getAllDataFiles().filter(df -> { fileGroup.getAllDataFiles().filter(df -> {
return compactionFileIdToLatestFileSlice.get(fileGroup.getId()) return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId())
.getBaseInstantTime().equals(df.getCommitTime()); .getBaseInstantTime().equals(df.getCommitTime());
}).findAny(); }).findAny();
Assert.assertTrue("Data File selected for compaction is retained", Assert.assertTrue("Data File selected for compaction is retained",
dataFileForCompactionPresent.isPresent()); dataFileForCompactionPresent.isPresent());
} else { } else {
// file has no more than max versions // file has no more than max versions
String fileId = fileGroup.getId(); String fileId = fileGroup.getFileGroupId().getFileId();
List<HoodieDataFile> dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList()); List<HoodieDataFile> dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList());
assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions", assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions",

View File

@@ -334,7 +334,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Call the main unschedule API // Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileId(), false, false); client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
final HoodieTableFileSystemView newFsView = final HoodieTableFileSystemView newFsView =

View File

@@ -38,8 +38,7 @@ public class CompactionOperation implements Serializable {
private Optional<String> dataFileCommitTime; private Optional<String> dataFileCommitTime;
private List<String> deltaFilePaths; private List<String> deltaFilePaths;
private Optional<String> dataFilePath; private Optional<String> dataFilePath;
private String fileId; private HoodieFileGroupId id;
private String partitionPath;
private Map<String, Double> metrics; private Map<String, Double> metrics;
//Only for serialization/de-serialization //Only for serialization/de-serialization
@@ -52,17 +51,16 @@ public class CompactionOperation implements Serializable {
if (dataFile.isPresent()) { if (dataFile.isPresent()) {
this.baseInstantTime = dataFile.get().getCommitTime(); this.baseInstantTime = dataFile.get().getCommitTime();
this.dataFilePath = Optional.of(dataFile.get().getPath()); this.dataFilePath = Optional.of(dataFile.get().getPath());
this.fileId = dataFile.get().getFileId(); this.id = new HoodieFileGroupId(partitionPath, dataFile.get().getFileId());
this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime()); this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime());
} else { } else {
assert logFiles.size() > 0; assert logFiles.size() > 0;
this.dataFilePath = Optional.absent(); this.dataFilePath = Optional.absent();
this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()); this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()));
this.dataFileCommitTime = Optional.absent(); this.dataFileCommitTime = Optional.absent();
} }
this.partitionPath = partitionPath;
this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()) this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString())
.collect(Collectors.toList()); .collect(Collectors.toList());
this.metrics = metrics; this.metrics = metrics;
@@ -85,17 +83,21 @@ public class CompactionOperation implements Serializable {
} }
public String getFileId() { public String getFileId() {
return fileId; return id.getFileId();
} }
public String getPartitionPath() { public String getPartitionPath() {
return partitionPath; return id.getPartitionPath();
} }
public Map<String, Double> getMetrics() { public Map<String, Double> getMetrics() {
return metrics; return metrics;
} }
public HoodieFileGroupId getFileGroupId() {
return id;
}
/** /**
* Convert Avro generated Compaction operation to POJO for Spark RDD operation * Convert Avro generated Compaction operation to POJO for Spark RDD operation
* @param operation Hoodie Compaction Operation * @param operation Hoodie Compaction Operation
@@ -106,9 +108,8 @@ public class CompactionOperation implements Serializable {
op.baseInstantTime = operation.getBaseInstantTime(); op.baseInstantTime = operation.getBaseInstantTime();
op.dataFilePath = Optional.fromNullable(operation.getDataFilePath()); op.dataFilePath = Optional.fromNullable(operation.getDataFilePath());
op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths());
op.fileId = operation.getFileId(); op.id = new HoodieFileGroupId(operation.getPartitionPath(), operation.getFileId());
op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics()); op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics());
op.partitionPath = operation.getPartitionPath();
return op; return op;
} }
} }

View File

@@ -30,9 +30,9 @@ import java.util.stream.Stream;
public class FileSlice implements Serializable { public class FileSlice implements Serializable {
/** /**
* id of the slice * File Group Id of the Slice
*/ */
private String fileId; private HoodieFileGroupId fileGroupId;
/** /**
* Point in the timeline, at which the slice was created * Point in the timeline, at which the slice was created
@@ -50,8 +50,12 @@ public class FileSlice implements Serializable {
*/ */
private final TreeSet<HoodieLogFile> logFiles; private final TreeSet<HoodieLogFile> logFiles;
public FileSlice(String baseInstantTime, String fileId) { public FileSlice(String partitionPath, String baseInstantTime, String fileId) {
this.fileId = fileId; this(new HoodieFileGroupId(partitionPath, fileId), baseInstantTime);
}
public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime) {
this.fileGroupId = fileGroupId;
this.baseInstantTime = baseInstantTime; this.baseInstantTime = baseInstantTime;
this.dataFile = null; this.dataFile = null;
this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator()); this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator());
@@ -73,8 +77,16 @@ public class FileSlice implements Serializable {
return baseInstantTime; return baseInstantTime;
} }
public String getPartitionPath() {
return fileGroupId.getPartitionPath();
}
public String getFileId() { public String getFileId() {
return fileId; return fileGroupId.getFileId();
}
public HoodieFileGroupId getFileGroupId() {
return fileGroupId;
} }
public Optional<HoodieDataFile> getDataFile() { public Optional<HoodieDataFile> getDataFile() {
@@ -84,6 +96,7 @@ public class FileSlice implements Serializable {
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("FileSlice {"); final StringBuilder sb = new StringBuilder("FileSlice {");
sb.append("fileGroupId=").append(fileGroupId);
sb.append("baseCommitTime=").append(baseInstantTime); sb.append("baseCommitTime=").append(baseInstantTime);
sb.append(", dataFile='").append(dataFile).append('\''); sb.append(", dataFile='").append(dataFile).append('\'');
sb.append(", logFiles='").append(logFiles).append('\''); sb.append(", logFiles='").append(logFiles).append('\'');

View File

@@ -34,14 +34,9 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Partition containing the file group. * file group id
*/ */
private final String partitionPath; private final HoodieFileGroupId fileGroupId;
/**
* uniquely identifies the file group
*/
private final String id;
/** /**
* Slices of files in this group, sorted with greater commit first. * Slices of files in this group, sorted with greater commit first.
@@ -59,8 +54,11 @@ public class HoodieFileGroup implements Serializable {
private final Optional<HoodieInstant> lastInstant; private final Optional<HoodieInstant> lastInstant;
public HoodieFileGroup(String partitionPath, String id, HoodieTimeline timeline) { public HoodieFileGroup(String partitionPath, String id, HoodieTimeline timeline) {
this.partitionPath = partitionPath; this(new HoodieFileGroupId(partitionPath, id), timeline);
this.id = id; }
public HoodieFileGroup(HoodieFileGroupId fileGroupId, HoodieTimeline timeline) {
this.fileGroupId = fileGroupId;
this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator()); this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator());
this.timeline = timeline; this.timeline = timeline;
this.lastInstant = timeline.lastInstant(); this.lastInstant = timeline.lastInstant();
@@ -72,7 +70,7 @@ public class HoodieFileGroup implements Serializable {
*/ */
public void addNewFileSliceAtInstant(String baseInstantTime) { public void addNewFileSliceAtInstant(String baseInstantTime) {
if (!fileSlices.containsKey(baseInstantTime)) { if (!fileSlices.containsKey(baseInstantTime)) {
fileSlices.put(baseInstantTime, new FileSlice(baseInstantTime, id)); fileSlices.put(baseInstantTime, new FileSlice(fileGroupId, baseInstantTime));
} }
} }
@@ -81,7 +79,7 @@ public class HoodieFileGroup implements Serializable {
*/ */
public void addDataFile(HoodieDataFile dataFile) { public void addDataFile(HoodieDataFile dataFile) {
if (!fileSlices.containsKey(dataFile.getCommitTime())) { if (!fileSlices.containsKey(dataFile.getCommitTime())) {
fileSlices.put(dataFile.getCommitTime(), new FileSlice(dataFile.getCommitTime(), id)); fileSlices.put(dataFile.getCommitTime(), new FileSlice(fileGroupId, dataFile.getCommitTime()));
} }
fileSlices.get(dataFile.getCommitTime()).setDataFile(dataFile); fileSlices.get(dataFile.getCommitTime()).setDataFile(dataFile);
} }
@@ -91,17 +89,17 @@ public class HoodieFileGroup implements Serializable {
*/ */
public void addLogFile(HoodieLogFile logFile) { public void addLogFile(HoodieLogFile logFile) {
if (!fileSlices.containsKey(logFile.getBaseCommitTime())) { if (!fileSlices.containsKey(logFile.getBaseCommitTime())) {
fileSlices.put(logFile.getBaseCommitTime(), new FileSlice(logFile.getBaseCommitTime(), id)); fileSlices.put(logFile.getBaseCommitTime(), new FileSlice(fileGroupId, logFile.getBaseCommitTime()));
} }
fileSlices.get(logFile.getBaseCommitTime()).addLogFile(logFile); fileSlices.get(logFile.getBaseCommitTime()).addLogFile(logFile);
} }
public String getId() { public String getPartitionPath() {
return id; return fileGroupId.getPartitionPath();
} }
public String getPartitionPath() { public HoodieFileGroupId getFileGroupId() {
return partitionPath; return fileGroupId;
} }
/** /**
@@ -197,7 +195,7 @@ public class HoodieFileGroup implements Serializable {
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("HoodieFileGroup {"); final StringBuilder sb = new StringBuilder("HoodieFileGroup {");
sb.append("id=").append(id); sb.append("id=").append(fileGroupId);
sb.append(", fileSlices='").append(fileSlices).append('\''); sb.append(", fileSlices='").append(fileSlices).append('\'');
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.model;
import java.io.Serializable;
import java.util.Objects;
/**
* Unique ID to identify a file-group in a data-set
*/
public class HoodieFileGroupId implements Serializable {
private final String partitionPath;
private final String fileId;
public HoodieFileGroupId(String partitionPath, String fileId) {
this.partitionPath = partitionPath;
this.fileId = fileId;
}
public String getPartitionPath() {
return partitionPath;
}
public String getFileId() {
return fileId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HoodieFileGroupId that = (HoodieFileGroupId) o;
return Objects.equals(partitionPath, that.partitionPath)
&& Objects.equals(fileId, that.fileId);
}
@Override
public int hashCode() {
return Objects.hash(partitionPath, fileId);
}
@Override
public String toString() {
return "HoodieFileGroupId{"
+ "partitionPath='" + partitionPath + '\''
+ ", fileId='" + fileId + '\''
+ '}';
}
}

View File

@@ -21,6 +21,7 @@ import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
@@ -70,12 +71,12 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
// mapping from partition paths to file groups contained within them // mapping from partition paths to file groups contained within them
protected HashMap<String, List<HoodieFileGroup>> partitionToFileGroupsMap; protected HashMap<String, List<HoodieFileGroup>> partitionToFileGroupsMap;
// mapping from file id to the file group. // mapping from file id to the file group.
protected HashMap<String, HoodieFileGroup> fileGroupMap; protected HashMap<HoodieFileGroupId, HoodieFileGroup> fileGroupMap;
/** /**
* File Id to pending compaction instant time * PartitionPath + File-Id to pending compaction instant time
*/ */
private final Map<String, Pair<String, CompactionOperation>> fileIdToPendingCompaction; private final Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction;
/** /**
* Create a file system view, as of the given timeline * Create a file system view, as of the given timeline
@@ -90,7 +91,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
// Build fileId to Pending Compaction Instants // Build fileId to Pending Compaction Instants
List<HoodieInstant> pendingCompactionInstants = List<HoodieInstant> pendingCompactionInstants =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
this.fileIdToPendingCompaction = ImmutableMap.copyOf( this.fgIdToPendingCompaction = ImmutableMap.copyOf(
CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream() CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream()
.map(entry -> Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), .map(entry -> Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(),
CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue())))) CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue()))))
@@ -123,6 +124,10 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
out.defaultWriteObject(); out.defaultWriteObject();
} }
private String getPartitionPathFromFileStatus(FileStatus fileStatus) {
return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), fileStatus.getPath().getParent());
}
/** /**
* Adds the provided statuses into the file system view, and also caches it inside this object. * Adds the provided statuses into the file system view, and also caches it inside this object.
*/ */
@@ -130,9 +135,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
Map<Pair<String, String>, List<HoodieDataFile>> dataFiles = convertFileStatusesToDataFiles( Map<Pair<String, String>, List<HoodieDataFile>> dataFiles = convertFileStatusesToDataFiles(
statuses) statuses)
.collect(Collectors.groupingBy((dataFile) -> { .collect(Collectors.groupingBy((dataFile) -> {
String partitionPathStr = FSUtils.getRelativePartitionPath( String partitionPathStr = getPartitionPathFromFileStatus(dataFile.getFileStatus());
new Path(metaClient.getBasePath()),
dataFile.getFileStatus().getPath().getParent());
return Pair.of(partitionPathStr, dataFile.getFileId()); return Pair.of(partitionPathStr, dataFile.getFileId());
})); }));
Map<Pair<String, String>, List<HoodieLogFile>> logFiles = convertFileStatusesToLogFiles( Map<Pair<String, String>, List<HoodieLogFile>> logFiles = convertFileStatusesToLogFiles(
@@ -157,17 +160,18 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
if (logFiles.containsKey(pair)) { if (logFiles.containsKey(pair)) {
logFiles.get(pair).forEach(group::addLogFile); logFiles.get(pair).forEach(group::addLogFile);
} }
if (fileIdToPendingCompaction.containsKey(fileId)) { HoodieFileGroupId fgId = group.getFileGroupId();
if (fgIdToPendingCompaction.containsKey(fgId)) {
// If there is no delta-commit after compaction request, this step would ensure a new file-slice appears // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears
// so that any new ingestion uses the correct base-instant // so that any new ingestion uses the correct base-instant
group.addNewFileSliceAtInstant(fileIdToPendingCompaction.get(fileId).getKey()); group.addNewFileSliceAtInstant(fgIdToPendingCompaction.get(fgId).getKey());
} }
fileGroups.add(group); fileGroups.add(group);
}); });
// add to the cache. // add to the cache.
fileGroups.forEach(group -> { fileGroups.forEach(group -> {
fileGroupMap.put(group.getId(), group); fileGroupMap.put(group.getFileGroupId(), group);
if (!partitionToFileGroupsMap.containsKey(group.getPartitionPath())) { if (!partitionToFileGroupsMap.containsKey(group.getPartitionPath())) {
partitionToFileGroupsMap.put(group.getPartitionPath(), new ArrayList<>()); partitionToFileGroupsMap.put(group.getPartitionPath(), new ArrayList<>());
} }
@@ -198,7 +202,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
* @param dataFile Data File * @param dataFile Data File
*/ */
private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) { private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) {
Pair<String, CompactionOperation> compactionWithInstantTime = fileIdToPendingCompaction.get(dataFile.getFileId()); final String partitionPath = getPartitionPathFromFileStatus(dataFile.getFileStatus());
HoodieFileGroupId fgId = new HoodieFileGroupId(partitionPath, dataFile.getFileId());
Pair<String, CompactionOperation> compactionWithInstantTime = fgIdToPendingCompaction.get(fgId);
if ((null != compactionWithInstantTime) && (null != compactionWithInstantTime.getLeft()) if ((null != compactionWithInstantTime) && (null != compactionWithInstantTime.getLeft())
&& dataFile.getCommitTime().equals(compactionWithInstantTime.getKey())) { && dataFile.getCommitTime().equals(compactionWithInstantTime.getKey())) {
return true; return true;
@@ -210,7 +216,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
public Stream<HoodieDataFile> getLatestDataFiles(final String partitionPath) { public Stream<HoodieDataFile> getLatestDataFiles(final String partitionPath) {
return getAllFileGroups(partitionPath) return getAllFileGroups(partitionPath)
.map(fileGroup -> { .map(fileGroup -> {
return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); return fileGroup.getAllDataFiles()
.filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst();
}) })
.filter(Optional::isPresent) .filter(Optional::isPresent)
.map(Optional::get); .map(Optional::get);
@@ -278,7 +285,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
.map(HoodieFileGroup::getLatestFileSlice) .map(HoodieFileGroup::getLatestFileSlice)
.filter(Optional::isPresent) .filter(Optional::isPresent)
.map(Optional::get) .map(Optional::get)
.map(this::filterDataFileAfterPendingCompaction); .map(fs -> filterDataFileAfterPendingCompaction(fs));
} }
@Override @Override
@@ -288,7 +295,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
FileSlice fileSlice = fileGroup.getLatestFileSlice().get(); FileSlice fileSlice = fileGroup.getLatestFileSlice().get();
// if the file-group is under compaction, pick the latest before compaction instant time. // if the file-group is under compaction, pick the latest before compaction instant time.
if (isFileSliceAfterPendingCompaction(fileSlice)) { if (isFileSliceAfterPendingCompaction(fileSlice)) {
String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getLeft(); String compactionInstantTime = fgIdToPendingCompaction.get(fileSlice.getFileGroupId()).getLeft();
return fileGroup.getLatestFileSliceBefore(compactionInstantTime); return fileGroup.getLatestFileSliceBefore(compactionInstantTime);
} }
return Optional.of(fileSlice); return Optional.of(fileSlice);
@@ -303,7 +310,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
* @return * @return
*/ */
private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
Pair<String, CompactionOperation> compactionWithInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()); Pair<String, CompactionOperation> compactionWithInstantTime =
fgIdToPendingCompaction.get(fileSlice.getFileGroupId());
return (null != compactionWithInstantTime) return (null != compactionWithInstantTime)
&& fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey()); && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey());
} }
@@ -318,7 +326,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
if (isFileSliceAfterPendingCompaction(fileSlice)) { if (isFileSliceAfterPendingCompaction(fileSlice)) {
// Data file is filtered out of the file-slice as the corresponding compaction // Data file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet. // instant not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getBaseInstantTime(), fileSlice.getFileId()); FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile); fileSlice.getLogFiles().forEach(transformed::addLogFile);
return transformed; return transformed;
} }
@@ -332,7 +341,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
.map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)) .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime))
.filter(Optional::isPresent) .filter(Optional::isPresent)
.map(Optional::get) .map(Optional::get)
.map(this::filterDataFileAfterPendingCompaction); .map(fs -> filterDataFileAfterPendingCompaction(fs));
} }
/** /**
@@ -342,7 +351,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
* @param penultimateSlice Penultimate file slice for a file-group in commit timeline order * @param penultimateSlice Penultimate file slice for a file-group in commit timeline order
*/ */
private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) { private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) {
FileSlice merged = new FileSlice(penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId()); FileSlice merged = new FileSlice(penultimateSlice.getPartitionPath(),
penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId());
if (penultimateSlice.getDataFile().isPresent()) { if (penultimateSlice.getDataFile().isPresent()) {
merged.setDataFile(penultimateSlice.getDataFile().get()); merged.setDataFile(penultimateSlice.getDataFile().get());
} }
@@ -361,8 +371,9 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
*/ */
private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) { private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) {
// if the file-group is under construction, pick the latest before compaction instant time. // if the file-group is under construction, pick the latest before compaction instant time.
if (fileIdToPendingCompaction.containsKey(fileSlice.getFileId())) { HoodieFileGroupId fgId = fileSlice.getFileGroupId();
String compactionInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()).getKey(); if (fgIdToPendingCompaction.containsKey(fgId)) {
String compactionInstantTime = fgIdToPendingCompaction.get(fgId).getKey();
if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) {
Optional<FileSlice> prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime); Optional<FileSlice> prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime);
if (prevFileSlice.isPresent()) { if (prevFileSlice.isPresent()) {
@@ -426,8 +437,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
} }
} }
public Map<String, Pair<String, CompactionOperation>> getFileIdToPendingCompaction() { public Map<HoodieFileGroupId, Pair<String, CompactionOperation>> getFgIdToPendingCompaction() {
return fileIdToPendingCompaction; return fgIdToPendingCompaction;
} }
public Stream<HoodieFileGroup> getAllFileGroups() { public Stream<HoodieFileGroup> getAllFileGroups() {

View File

@@ -20,6 +20,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -134,36 +135,38 @@ public class CompactionUtils {
} }
/** /**
* Get all file-ids with pending Compaction operations and their target compaction instant time * Get all PartitionPath + file-ids with pending Compaction operations and their target compaction instant time
* *
* @param metaClient Hoodie Table Meta Client * @param metaClient Hoodie Table Meta Client
*/ */
public static Map<String, Pair<String, HoodieCompactionOperation>> getAllPendingCompactionOperations( public static Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> getAllPendingCompactionOperations(
HoodieTableMetaClient metaClient) { HoodieTableMetaClient metaClient) {
List<Pair<HoodieInstant, HoodieCompactionPlan>> pendingCompactionPlanWithInstants = List<Pair<HoodieInstant, HoodieCompactionPlan>> pendingCompactionPlanWithInstants =
getAllPendingCompactionPlans(metaClient); getAllPendingCompactionPlans(metaClient);
Map<String, Pair<String, HoodieCompactionOperation>> fileIdToPendingCompactionWithInstantMap = new HashMap<>(); Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToPendingCompactionWithInstantMap =
new HashMap<>();
pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> { pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> {
HoodieInstant instant = instantPlanPair.getKey(); HoodieInstant instant = instantPlanPair.getKey();
HoodieCompactionPlan compactionPlan = instantPlanPair.getValue(); HoodieCompactionPlan compactionPlan = instantPlanPair.getValue();
List<HoodieCompactionOperation> ops = compactionPlan.getOperations(); List<HoodieCompactionOperation> ops = compactionPlan.getOperations();
if (null != ops) { if (null != ops) {
return ops.stream().map(op -> { return ops.stream().map(op -> {
return Pair.of(op.getFileId(), Pair.of(instant.getTimestamp(), op)); return Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()),
Pair.of(instant.getTimestamp(), op));
}); });
} else { } else {
return Stream.empty(); return Stream.empty();
} }
}).forEach(pair -> { }).forEach(pair -> {
// Defensive check to ensure a single-fileId does not have more than one pending compaction // Defensive check to ensure a single-fileId does not have more than one pending compaction
if (fileIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) {
String msg = "Hoodie File Id (" + pair.getKey() + ") has more thant 1 pending compactions. Instants: " String msg = "Hoodie File Id (" + pair.getKey() + ") has more thant 1 pending compactions. Instants: "
+ pair.getValue() + ", " + fileIdToPendingCompactionWithInstantMap.get(pair.getKey()); + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey());
throw new IllegalStateException(msg); throw new IllegalStateException(msg);
} }
fileIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue());
}); });
return fileIdToPendingCompactionWithInstantMap; return fgIdToPendingCompactionWithInstantMap;
} }
} }

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -51,6 +52,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@@ -676,7 +678,7 @@ public class HoodieTableFileSystemViewTest {
assertEquals(3, fileGroups.size()); assertEquals(3, fileGroups.size());
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
String fileId = fileGroup.getId(); String fileId = fileGroup.getFileGroupId().getFileId();
Set<String> filenames = Sets.newHashSet(); Set<String> filenames = Sets.newHashSet();
fileGroup.getAllDataFiles().forEach(dataFile -> { fileGroup.getAllDataFiles().forEach(dataFile -> {
assertEquals("All same fileId should be grouped", fileId, dataFile.getFileId()); assertEquals("All same fileId should be grouped", fileId, dataFile.getFileId());
@@ -850,16 +852,17 @@ public class HoodieTableFileSystemViewTest {
assertEquals(3, fileGroups.size()); assertEquals(3, fileGroups.size());
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> slices = fileGroup.getAllFileSlices().collect(Collectors.toList()); List<FileSlice> slices = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileGroup.getId().equals(fileId1)) { String fileId = fileGroup.getFileGroupId().getFileId();
if (fileId.equals(fileId1)) {
assertEquals(2, slices.size()); assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); assertEquals(commitTime4, slices.get(0).getBaseInstantTime());
assertEquals(commitTime1, slices.get(1).getBaseInstantTime()); assertEquals(commitTime1, slices.get(1).getBaseInstantTime());
} else if (fileGroup.getId().equals(fileId2)) { } else if (fileId.equals(fileId2)) {
assertEquals(3, slices.size()); assertEquals(3, slices.size());
assertEquals(commitTime3, slices.get(0).getBaseInstantTime()); assertEquals(commitTime3, slices.get(0).getBaseInstantTime());
assertEquals(commitTime2, slices.get(1).getBaseInstantTime()); assertEquals(commitTime2, slices.get(1).getBaseInstantTime());
assertEquals(commitTime1, slices.get(2).getBaseInstantTime()); assertEquals(commitTime1, slices.get(2).getBaseInstantTime());
} else if (fileGroup.getId().equals(fileId3)) { } else if (fileId.equals(fileId3)) {
assertEquals(2, slices.size()); assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); assertEquals(commitTime4, slices.get(0).getBaseInstantTime());
assertEquals(commitTime3, slices.get(1).getBaseInstantTime()); assertEquals(commitTime3, slices.get(1).getBaseInstantTime());
@@ -876,4 +879,159 @@ public class HoodieTableFileSystemViewTest {
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
} }
@Test
public void testPendingCompactionWithDuplicateFileIdsAcrossPartitions() throws Exception {
// Put some files in the partition
String partitionPath1 = "2016/05/01";
String partitionPath2 = "2016/05/02";
String partitionPath3 = "2016/05/03";
String fullPartitionPath1 = basePath + "/" + partitionPath1 + "/";
new File(fullPartitionPath1).mkdirs();
String fullPartitionPath2 = basePath + "/" + partitionPath2 + "/";
new File(fullPartitionPath2).mkdirs();
String fullPartitionPath3 = basePath + "/" + partitionPath3 + "/";
new File(fullPartitionPath3).mkdirs();
String instantTime1 = "1";
String deltaInstantTime1 = "2";
String deltaInstantTime2 = "3";
String fileId = UUID.randomUUID().toString();
String dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
new File(fullPartitionPath1 + dataFileName).createNewFile();
String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
new File(fullPartitionPath1 + fileName1)
.createNewFile();
new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
new File(fullPartitionPath2 + fileName1)
.createNewFile();
new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
new File(fullPartitionPath3 + fileName1)
.createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
commitTimeline.saveAsComplete(instant1, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant2, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant3, Optional.empty());
// Now we list all partitions
FileStatus[] statuses = metaClient.getFs().listStatus(new Path[] {
new Path(fullPartitionPath1), new Path(fullPartitionPath2), new Path(fullPartitionPath3)
});
assertEquals(6, statuses.length);
refreshFsView(statuses);
List<HoodieFileGroup> groups = fsView.getAllFileGroups().collect(Collectors.toList());
Assert.assertEquals("Expected number of file-groups", 3, groups.size());
Assert.assertEquals("Partitions must be different for file-groups", 3,
groups.stream().map(HoodieFileGroup::getPartitionPath).collect(Collectors.toSet()).size());
Set<String> fileIds = groups.stream().map(HoodieFileGroup::getFileGroupId)
.map(HoodieFileGroupId::getFileId).collect(Collectors.toSet());
Assert.assertEquals("File Id must be same", 1, fileIds.size());
Assert.assertTrue("Expected FileId", fileIds.contains(fileId));
// Setup Pending compaction for all of these fileIds.
List<Pair<String, FileSlice>> partitionFileSlicesPairs = new ArrayList<>();
List<FileSlice> fileSlices = rtView.getLatestFileSlices(partitionPath1).collect(Collectors.toList());
partitionFileSlicesPairs.add(Pair.of(partitionPath1, fileSlices.get(0)));
fileSlices = rtView.getLatestFileSlices(partitionPath2).collect(Collectors.toList());
partitionFileSlicesPairs.add(Pair.of(partitionPath2, fileSlices.get(0)));
fileSlices = rtView.getLatestFileSlices(partitionPath3).collect(Collectors.toList());
partitionFileSlicesPairs.add(Pair.of(partitionPath3, fileSlices.get(0)));
String compactionRequestedTime = "2";
String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
Optional.empty(), Optional.empty());
// Create a Data-file for some of the partitions but this should be skipped by view
new File(basePath + "/" + partitionPath1 + "/" + compactDataFileName).createNewFile();
new File(basePath + "/" + partitionPath2 + "/" + compactDataFileName).createNewFile();
HoodieInstant compactionInstant =
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
HoodieInstant requested = HoodieTimeline.getCompactionRequestedInstant(compactionInstant.getTimestamp());
metaClient.getActiveTimeline().saveToCompactionRequested(requested,
AvroUtils.serializeCompactionPlan(compactionPlan));
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requested);
// Fake delta-ingestion after compaction-requested
String deltaInstantTime4 = "3";
String deltaInstantTime5 = "6";
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath1 + "/" + fileName4).createNewFile();
new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile();
new File(basePath + "/" + partitionPath3 + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath3 + "/" + fileName4).createNewFile();
HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4);
HoodieInstant deltaInstant5 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime5);
commitTimeline.saveAsComplete(deltaInstant4, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant5, Optional.empty());
refreshFsView(null);
// Test Data Files
List<HoodieDataFile> dataFiles = roView.getAllDataFiles(partitionPath1).collect(Collectors.toList());
assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size());
assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime());
dataFiles = roView.getAllDataFiles(partitionPath2).collect(Collectors.toList());
assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size());
assertEquals("Expect only valid commit", "1", dataFiles.get(0).getCommitTime());
/** Merge API Tests **/
Arrays.asList(partitionPath1, partitionPath2, partitionPath3).stream().forEach(partitionPath -> {
List<FileSlice> fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5)
.collect(Collectors.toList());
assertEquals("Expect file-slice to be merged", 1, fileSliceList.size());
FileSlice fileSlice = fileSliceList.get(0);
assertEquals(fileId, fileSlice.getFileId());
assertEquals("Data file must be present", dataFileName, fileSlice.getDataFile().get().getFileName());
assertEquals("Base Instant of penultimate file-slice must be base instant", instantTime1,
fileSlice.getBaseInstantTime());
List<HoodieLogFile> logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Log files must include those after compaction request", 3, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(2).getFileName());
fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5)
.collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
assertEquals(fileId, fileSlice.getFileId());
assertFalse("No data-file expected in latest file-slice", fileSlice.getDataFile().isPresent());
assertEquals("Compaction requested instant must be base instant", compactionRequestedTime,
fileSlice.getBaseInstantTime());
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Log files must include only those after compaction request", 2, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
});
Assert.assertEquals(3, fsView.getFgIdToPendingCompaction().size());
Set<String> partitionsInCompaction =
fsView.getFgIdToPendingCompaction().keySet().stream().map(HoodieFileGroupId::getPartitionPath)
.collect(Collectors.toSet());
Assert.assertEquals(3, partitionsInCompaction.size());
Assert.assertTrue(partitionsInCompaction.contains(partitionPath1));
Assert.assertTrue(partitionsInCompaction.contains(partitionPath2));
Assert.assertTrue(partitionsInCompaction.contains(partitionPath3));
Set<String> fileIdsInCompaction =
fsView.getFgIdToPendingCompaction().keySet().stream().map(HoodieFileGroupId::getFileId)
.collect(Collectors.toSet());
Assert.assertEquals(1, fileIdsInCompaction.size());
Assert.assertTrue(fileIdsInCompaction.contains(fileId));
}
} }

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -48,7 +49,7 @@ import org.junit.Assert;
public class CompactionTestUtils { public class CompactionTestUtils {
public static Map<String, Pair<String, HoodieCompactionOperation>> setupAndValidateCompactionOperations( public static Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> setupAndValidateCompactionOperations(
HoodieTableMetaClient metaClient, boolean inflight, HoodieTableMetaClient metaClient, boolean inflight,
int numEntriesInPlan1, int numEntriesInPlan2, int numEntriesInPlan1, int numEntriesInPlan2,
int numEntriesInPlan3, int numEntriesInPlan4) throws IOException { int numEntriesInPlan3, int numEntriesInPlan4) throws IOException {
@@ -91,10 +92,10 @@ public class CompactionTestUtils {
}); });
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath(), true);
Map<String, Pair<String, HoodieCompactionOperation>> pendingCompactionMap = Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> pendingCompactionMap =
CompactionUtils.getAllPendingCompactionOperations(metaClient); CompactionUtils.getAllPendingCompactionOperations(metaClient);
Map<String, Pair<String, HoodieCompactionOperation>> expPendingCompactionMap = Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> expPendingCompactionMap =
generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4), baseInstantsToCompaction); generateExpectedCompactionOperations(Arrays.asList(plan1, plan2, plan3, plan4), baseInstantsToCompaction);
// Ensure Compaction operations are fine. // Ensure Compaction operations are fine.
@@ -102,12 +103,13 @@ public class CompactionTestUtils {
return expPendingCompactionMap; return expPendingCompactionMap;
} }
public static Map<String, Pair<String, HoodieCompactionOperation>> generateExpectedCompactionOperations( public static Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> generateExpectedCompactionOperations(
List<HoodieCompactionPlan> plans, Map<String, String> baseInstantsToCompaction) { List<HoodieCompactionPlan> plans, Map<String, String> baseInstantsToCompaction) {
return plans.stream() return plans.stream()
.flatMap(plan -> { .flatMap(plan -> {
if (plan.getOperations() != null) { if (plan.getOperations() != null) {
return plan.getOperations().stream().map(op -> Pair.of(op.getFileId(), return plan.getOperations().stream().map(op -> Pair.of(
new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()),
Pair.of(baseInstantsToCompaction.get(op.getBaseInstantTime()), op))); Pair.of(baseInstantsToCompaction.get(op.getBaseInstantTime()), op)));
} }
return Stream.empty(); return Stream.empty();
@@ -146,7 +148,7 @@ public class CompactionTestUtils {
instantId, fileId, Optional.of(1)); instantId, fileId, Optional.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Optional.of(2)); instantId, fileId, Optional.of(2));
FileSlice slice = new FileSlice(instantId, fileId); FileSlice slice = new FileSlice(DEFAULT_PARTITION_PATHS[0], instantId, fileId);
if (createDataFile) { if (createDataFile) {
slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0] slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0]
+ "/" + FSUtils.makeDataFileName(instantId, 1, fileId))); + "/" + FSUtils.makeDataFileName(instantId, 1, fileId)));

View File

@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.uber.hoodie.avro.model.HoodieCompactionOperation; import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
@@ -69,20 +70,20 @@ public class TestCompactionUtils {
@Test @Test
public void testBuildFromFileSlice() { public void testBuildFromFileSlice() {
// Empty File-Slice with no data and log files // Empty File-Slice with no data and log files
FileSlice emptyFileSlice = new FileSlice("000", "empty1"); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "empty1");
HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice( HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn)); DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
// File Slice with data-file but no log files // File Slice with data-file but no log files
FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
op = CompactionUtils.buildFromFileSlice( op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn)); DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
//File Slice with no data-file but log files present //File Slice with no data-file but log files present
FileSlice noDataFileSlice = new FileSlice("000", "noData1"); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path( noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path( noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
@@ -92,7 +93,7 @@ public class TestCompactionUtils {
testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
//File Slice with data-file and log files present //File Slice with data-file and log files present
FileSlice fileSlice = new FileSlice("000", "noData1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path( fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
@@ -107,16 +108,16 @@ public class TestCompactionUtils {
* Generate input for compaction plan tests * Generate input for compaction plan tests
*/ */
private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() { private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() {
FileSlice emptyFileSlice = new FileSlice("000", "empty1"); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "empty1");
FileSlice fileSlice = new FileSlice("000", "noData1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path( fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
fileSlice.addLogFile(new HoodieLogFile(new Path( fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 2)))); FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
FileSlice noLogFileSlice = new FileSlice("000", "noLog1"); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet")); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
FileSlice noDataFileSlice = new FileSlice("000", "noData1"); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path( noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1)))); FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path( noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
@@ -161,7 +162,7 @@ public class TestCompactionUtils {
// schedule same plan again so that there will be duplicates // schedule same plan again so that there will be duplicates
scheduleCompaction(metaClient, "005", plan1); scheduleCompaction(metaClient, "005", plan1);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
Map<String, Pair<String, HoodieCompactionOperation>> res = Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> res =
CompactionUtils.getAllPendingCompactionOperations(metaClient); CompactionUtils.getAllPendingCompactionOperations(metaClient);
} }

View File

@@ -5,6 +5,7 @@ import com.beust.jcommander.Parameter;
import com.uber.hoodie.CompactionAdminClient; import com.uber.hoodie.CompactionAdminClient;
import com.uber.hoodie.CompactionAdminClient.RenameOpResult; import com.uber.hoodie.CompactionAdminClient.RenameOpResult;
import com.uber.hoodie.CompactionAdminClient.ValidationOpResult; import com.uber.hoodie.CompactionAdminClient.ValidationOpResult;
import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
@@ -60,7 +61,8 @@ public class HoodieCompactionAdminTool {
break; break;
case UNSCHEDULE_FILE: case UNSCHEDULE_FILE:
List<RenameOpResult> r = List<RenameOpResult> r =
admin.unscheduleCompactionFileId(cfg.fileId, cfg.skipValidation, cfg.dryRun); admin.unscheduleCompactionFileId(new HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
cfg.skipValidation, cfg.dryRun);
if (cfg.printOutput) { if (cfg.printOutput) {
System.out.println(r); System.out.println(r);
} }
@@ -132,6 +134,8 @@ public class HoodieCompactionAdminTool {
public String basePath = null; public String basePath = null;
@Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false) @Parameter(names = {"--instant-time", "-in"}, description = "Compaction Instant time", required = false)
public String compactionInstantTime = null; public String compactionInstantTime = null;
@Parameter(names = {"--partition-path", "-pp"}, description = "Partition Path", required = false)
public String partitionPath = null;
@Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false) @Parameter(names = {"--file-id", "-id"}, description = "File Id", required = false)
public String fileId = null; public String fileId = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false) @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)