diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index c89a68982..86f3fe81a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -290,9 +290,10 @@ public class CompactionCommand implements CommandMarker { String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; List rows = new ArrayList<>(); res.stream().forEach(r -> { - Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), - r.getOperation().getDataFilePath().isPresent() ? r.getOperation().getDataFilePath().get() : "", - r.getOperation().getDeltaFilePaths().size(), r.isSuccess(), + Comparable[] row = new Comparable[]{r.getOperation().getFileId(), + r.getOperation().getBaseInstantTime(), + r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", + r.getOperation().getDeltaFileNames().size(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""}; rows.add(row); }); diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 7fc34c433..5401c73a4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -239,8 +239,9 @@ public class CompactionAdminClient extends AbstractHoodieClient { FileSlice merged = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); - final int maxVersion = op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) - .reduce((x, y) -> x > y ? x : y).orElse(0); + final int maxVersion = + op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) + .reduce((x, y) -> x > y ? x : y).orElse(0); List logFilesToBeMoved = merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); return logFilesToBeMoved.stream().map(lf -> { @@ -291,28 +292,34 @@ public class CompactionAdminClient extends AbstractHoodieClient { if (fileSliceOptional.isPresent()) { FileSlice fs = fileSliceOptional.get(); Option df = fs.getDataFile(); - if (operation.getDataFilePath().isPresent()) { - String expPath = - metaClient.getFs().getFileStatus(new Path(operation.getDataFilePath().get())).getPath().toString(); - Preconditions.checkArgument(df.isPresent(), - "Data File must be present. File Slice was : " + fs + ", operation :" + operation); + if (operation.getDataFileName().isPresent()) { + String expPath = metaClient.getFs().getFileStatus(new Path( + FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), + new Path(operation.getDataFileName().get()))).getPath() + .toString(); + Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : " + + fs + ", operation :" + operation); Preconditions.checkArgument(df.get().getPath().equals(expPath), "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); } Set logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); - Set logFilesInCompactionOp = operation.getDeltaFilePaths().stream().map(dp -> { - try { - FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path(dp)); - Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); - return new HoodieLogFile(fileStatuses[0]); - } catch (FileNotFoundException fe) { - throw new CompactionValidationException(fe.getMessage()); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - }).collect(Collectors.toSet()); - Set missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) - .collect(Collectors.toSet()); + Set logFilesInCompactionOp = operation.getDeltaFileNames().stream() + .map(dp -> { + try { + FileStatus[] fileStatuses = metaClient.getFs().listStatus( + new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), + new Path(dp))); + Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); + return new HoodieLogFile(fileStatuses[0]); + } catch (FileNotFoundException fe) { + throw new CompactionValidationException(fe.getMessage()); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }).collect(Collectors.toSet()); + Set missing = + logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) + .collect(Collectors.toSet()); Preconditions.checkArgument(missing.isEmpty(), "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 86fc7d020..642563167 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -57,6 +57,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -1176,13 +1177,14 @@ public class HoodieWriteClient extends AbstractHo */ private JavaRDD runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { - HoodieCompactionPlan compactionPlan = - AvroUtils.deserializeCompactionPlan(activeTimeline.getInstantAuxiliaryDetails(compactionInstant).get()); + HoodieTableMetaClient metaClient = createMetaClient(true); + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, + compactionInstant.getTimestamp()); + // Mark instant as compaction inflight activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); JavaRDD statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 095b210b7..3fece32f0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.WriteStatus; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -98,10 +99,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { FileSystem fs = metaClient.getFs(); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation.getDeltaFilePaths() - + " for commit " + commitTime); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation + .getDeltaFileNames() + " for commit " + commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -113,15 +114,21 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), - operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), - config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), - config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); + + List logFiles = operation.getDeltaFileNames().stream() + .map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), + p).toString()).collect(toList()); + HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, + metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, + config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), + config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), + config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } - Option oldDataFileOpt = operation.getBaseFile(); + Option oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), + operation.getPartitionPath()); // Compacting is very similar to applying updates to existing file Iterator> result; @@ -182,22 +189,28 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath) - .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { - List logFiles = - s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - totalLogFiles.add((long) logFiles.size()); - totalFileSlices.add(1L); - // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO - // for spark Map operations and collecting them finally in Avro generated classes for storing - // into meta files. - Option dataFile = s.getDataFile(); - return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); - }).filter(c -> !c.getDeltaFilePaths().isEmpty()).collect(toList()).iterator()) - .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); + List operations = + jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> + !fgIdsInPendingCompactions.contains(slice.getFileGroupId())) + .map( + s -> { + List logFiles = s.getLogFiles().sorted(HoodieLogFile + .getLogFileComparator()).collect(Collectors.toList()); + totalLogFiles.add((long) logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for spark Map operations and collecting them finally in Avro generated classes for storing + // into meta files. + Option dataFile = s.getDataFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); + }) + .filter(c -> !c.getDeltaFileNames().isEmpty()) + .collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation) + .collect(toList()); log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of log files " + totalLogFiles.value()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java index 7fc1634ca..8fa2b57d5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -94,7 +95,9 @@ public abstract class CompactionStrategy implements Serializable { List operations, List pendingCompactionPlans) { // Strategy implementation can overload this method to set specific compactor-id return HoodieCompactionPlan.newBuilder() - .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)).build(); + .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) + .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION) + .build(); } /** diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 4f3d193cb..7cb6c6428 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -67,11 +67,13 @@ ${basedir}/src/main/avro/HoodieCommitMetadata.avsc + ${basedir}/src/main/avro/HoodieCompactionOperation.avsc ${basedir}/src/main/avro/HoodieSavePointMetadata.avsc ${basedir}/src/main/avro/HoodieCompactionMetadata.avsc ${basedir}/src/main/avro/HoodieCleanMetadata.avsc ${basedir}/src/main/avro/HoodieRollbackMetadata.avsc ${basedir}/src/main/avro/HoodieRestoreMetadata.avsc + ${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc index 6594ba0b4..626b478a4 100644 --- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc +++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc @@ -67,6 +67,11 @@ { "name":"actionType", "type":["null","string"] + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc index 72ce3e3c2..f6c05c5b5 100644 --- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc @@ -36,6 +36,11 @@ ] } } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc index 9eaa30d87..7796d99d7 100644 --- a/hudi-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCommitMetadata.avsc @@ -126,6 +126,11 @@ "type":"map", "values":"string" }] + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc index 9f0a66800..0036e5739 100644 --- a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc +++ b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc @@ -74,6 +74,11 @@ "values":"string" }], "default": null + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/avro/HoodieRestoreMetadata.avsc b/hudi-common/src/main/avro/HoodieRestoreMetadata.avsc index a54e67fe2..03defbb13 100644 --- a/hudi-common/src/main/avro/HoodieRestoreMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRestoreMetadata.avsc @@ -29,6 +29,11 @@ "items": "HoodieRollbackMetadata", "name": "hoodieRollbackMetadata" } - }} + }}, + { + "name":"version", + "type":["int", "null"], + "default": 1 + } ] } diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc index 4d4776758..98d4ec68f 100644 --- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -34,6 +34,11 @@ ] } } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/avro/HoodieSavePointMetadata.avsc b/hudi-common/src/main/avro/HoodieSavePointMetadata.avsc index 5bc977b35..901a73a9d 100644 --- a/hudi-common/src/main/avro/HoodieSavePointMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieSavePointMetadata.avsc @@ -32,6 +32,11 @@ ] } } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java index 4b1a457df..d5c07d40a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java @@ -39,8 +39,8 @@ public class CompactionOperation implements Serializable { private String baseInstantTime; private Option dataFileCommitTime; - private List deltaFilePaths; - private Option dataFilePath; + private List deltaFileNames; + private Option dataFileName; private HoodieFileGroupId id; private Map metrics; @@ -49,12 +49,12 @@ public class CompactionOperation implements Serializable { public CompactionOperation() {} public CompactionOperation(String fileId, String partitionPath, String baseInstantTime, - Option dataFileCommitTime, List deltaFilePaths, Option dataFilePath, + Option dataFileCommitTime, List deltaFileNames, Option dataFileName, Map metrics) { this.baseInstantTime = baseInstantTime; this.dataFileCommitTime = dataFileCommitTime; - this.deltaFilePaths = deltaFilePaths; - this.dataFilePath = dataFilePath; + this.deltaFileNames = deltaFileNames; + this.dataFileName = dataFileName; this.id = new HoodieFileGroupId(partitionPath, fileId); this.metrics = metrics; } @@ -63,18 +63,18 @@ public class CompactionOperation implements Serializable { Map metrics) { if (dataFile.isPresent()) { this.baseInstantTime = dataFile.get().getCommitTime(); - this.dataFilePath = Option.of(dataFile.get().getPath()); + this.dataFileName = Option.of(dataFile.get().getFileName()); this.id = new HoodieFileGroupId(partitionPath, dataFile.get().getFileId()); this.dataFileCommitTime = Option.of(dataFile.get().getCommitTime()); } else { assert logFiles.size() > 0; - this.dataFilePath = Option.empty(); + this.dataFileName = Option.empty(); this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath())); this.dataFileCommitTime = Option.empty(); } - this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()); + this.deltaFileNames = logFiles.stream().map(s -> s.getPath().getName()).collect(Collectors.toList()); this.metrics = metrics; } @@ -86,12 +86,12 @@ public class CompactionOperation implements Serializable { return dataFileCommitTime; } - public List getDeltaFilePaths() { - return deltaFilePaths; + public List getDeltaFileNames() { + return deltaFileNames; } - public Option getDataFilePath() { - return dataFilePath; + public Option getDataFileName() { + return dataFileName; } public String getFileId() { @@ -110,9 +110,9 @@ public class CompactionOperation implements Serializable { return id; } - public Option getBaseFile() { - // TODO: HUDI-130 - Paths return in compaction plan needs to be relative to base-path - return dataFilePath.map(df -> new HoodieDataFile(df)); + public Option getBaseFile(String basePath, String partitionPath) { + Path dirPath = FSUtils.getPartitionPath(basePath, partitionPath); + return dataFileName.map(df -> new HoodieDataFile(new Path(dirPath, df).toString())); } /** @@ -124,9 +124,9 @@ public class CompactionOperation implements Serializable { public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) { CompactionOperation op = new CompactionOperation(); op.baseInstantTime = operation.getBaseInstantTime(); - op.dataFilePath = Option.ofNullable(operation.getDataFilePath()); - op.dataFileCommitTime = op.dataFilePath.map(p -> FSUtils.getCommitTime(new Path(p).getName())); - op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); + op.dataFileName = Option.ofNullable(operation.getDataFilePath()); + op.dataFileCommitTime = op.dataFileName.map(p -> FSUtils.getCommitTime(new Path(p).getName())); + op.deltaFileNames = new ArrayList<>(operation.getDeltaFilePaths()); op.id = new HoodieFileGroupId(operation.getPartitionPath(), operation.getFileId()); op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics()); return op; @@ -134,9 +134,14 @@ public class CompactionOperation implements Serializable { @Override public String toString() { - return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime=" - + dataFileCommitTime + ", deltaFilePaths=" + deltaFilePaths + ", dataFilePath=" + dataFilePath + ", id='" + id - + '\'' + ", metrics=" + metrics + '}'; + return "CompactionOperation{" + + "baseInstantTime='" + baseInstantTime + '\'' + + ", dataFileCommitTime=" + dataFileCommitTime + + ", deltaFileNames=" + deltaFileNames + + ", dataFileName=" + dataFileName + + ", id='" + id + '\'' + + ", metrics=" + metrics + + '}'; } @Override @@ -150,8 +155,9 @@ public class CompactionOperation implements Serializable { CompactionOperation operation = (CompactionOperation) o; return Objects.equals(baseInstantTime, operation.baseInstantTime) && Objects.equals(dataFileCommitTime, operation.dataFileCommitTime) - && Objects.equals(deltaFilePaths, operation.deltaFilePaths) - && Objects.equals(dataFilePath, operation.dataFilePath) && Objects.equals(id, operation.id); + && Objects.equals(deltaFileNames, operation.deltaFileNames) + && Objects.equals(dataFileName, operation.dataFileName) + && Objects.equals(id, operation.id); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/CompactionOpDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/CompactionOpDTO.java index 0466505bc..5fc4d11af 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/CompactionOpDTO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/CompactionOpDTO.java @@ -61,8 +61,8 @@ public class CompactionOpDTO { dto.compactionInstantTime = compactionInstantTime; dto.baseInstantTime = op.getBaseInstantTime(); dto.dataFileCommitTime = op.getDataFileCommitTime().orElse(null); - dto.dataFilePath = op.getDataFilePath().orElse(null); - dto.deltaFilePaths = new ArrayList<>(op.getDeltaFilePaths()); + dto.dataFilePath = op.getDataFileName().orElse(null); + dto.deltaFilePaths = new ArrayList<>(op.getDeltaFileNames()); dto.partitionPath = op.getPartitionPath(); dto.metrics = op.getMetrics() == null ? new HashMap<>() : new HashMap<>(op.getMetrics()); return dto; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index f6a99acb4..a0185b922 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -47,6 +47,8 @@ import org.apache.hudi.common.HoodieRollbackStat; public class AvroUtils { + private static final Integer DEFAULT_VERSION = 1; + public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Option durationInMs, List cleanStats) { ImmutableMap.Builder partitionMetadataBuilder = ImmutableMap.builder(); @@ -64,7 +66,7 @@ public class AvroUtils { } } return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, - earliestCommitToRetain, partitionMetadataBuilder.build()); + earliestCommitToRetain, partitionMetadataBuilder.build(), DEFAULT_VERSION); } public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option durationInMs, @@ -75,7 +77,7 @@ public class AvroUtils { Arrays.asList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); } return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, - commitToStatBuilder.build()); + commitToStatBuilder.build(), DEFAULT_VERSION); } public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option durationInMs, @@ -88,8 +90,9 @@ public class AvroUtils { partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); totalDeleted += stat.getSuccessDeleteFiles().size(); } + return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), totalDeleted, commits, - partitionMetadataBuilder.build()); + partitionMetadataBuilder.build(), DEFAULT_VERSION); } public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment, @@ -99,7 +102,8 @@ public class AvroUtils { HoodieSavepointPartitionMetadata metadata = new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue()); partitionMetadataBuilder.put(stat.getKey(), metadata); } - return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, partitionMetadataBuilder.build()); + return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, partitionMetadataBuilder.build(), + DEFAULT_VERSION); } public static Option serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index acb781d67..56fc4de84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -34,6 +34,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator; +import org.apache.hudi.common.versioning.compaction.CompactionV1MigrationHandler; +import org.apache.hudi.common.versioning.compaction.CompactionV2MigrationHandler; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,6 +48,10 @@ public class CompactionUtils { private static final Logger LOG = LogManager.getLogger(CompactionUtils.class); + public static final Integer COMPACTION_METADATA_VERSION_1 = CompactionV1MigrationHandler.VERSION; + public static final Integer COMPACTION_METADATA_VERSION_2 = CompactionV2MigrationHandler.VERSION; + public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2; + /** * Generate compaction operation from file-slice * @@ -59,9 +66,9 @@ public class CompactionUtils { builder.setPartitionPath(partitionPath); builder.setFileId(fileSlice.getFileId()); builder.setBaseInstantTime(fileSlice.getBaseInstantTime()); - builder.setDeltaFilePaths(fileSlice.getLogFiles().map(lf -> lf.getPath().toString()).collect(Collectors.toList())); + builder.setDeltaFilePaths(fileSlice.getLogFiles().map(lf -> lf.getPath().getName()).collect(Collectors.toList())); if (fileSlice.getDataFile().isPresent()) { - builder.setDataFilePath(fileSlice.getDataFile().get().getPath()); + builder.setDataFilePath(fileSlice.getDataFile().get().getFileName()); } if (metricsCaptureFunction.isPresent()) { @@ -82,9 +89,11 @@ public class CompactionUtils { Option, Map>> metricsCaptureFunction) { HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); + builder.setOperations(partitionFileSlicePairs.stream() .map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)) .collect(Collectors.toList())); + builder.setVersion(LATEST_COMPACTION_METADATA_VERSION); return builder.build(); } @@ -94,8 +103,8 @@ public class CompactionUtils { public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) { return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()).setBaseInstantTime(op.getBaseInstantTime()) .setPartitionPath(op.getPartitionPath()) - .setDataFilePath(op.getDataFilePath().isPresent() ? op.getDataFilePath().get() : null) - .setDeltaFilePaths(op.getDeltaFilePaths()).setMetrics(op.getMetrics()).build(); + .setDataFilePath(op.getDataFileName().isPresent() ? op.getDataFileName().get() : null) + .setDeltaFilePaths(op.getDeltaFileNames()).setMetrics(op.getMetrics()).build(); } /** @@ -127,9 +136,10 @@ public class CompactionUtils { public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) throws IOException { + CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(metaClient.getActiveTimeline() .getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); - return compactionPlan; + return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/AbstractMigratorBase.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/AbstractMigratorBase.java new file mode 100644 index 000000000..aadc28797 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/AbstractMigratorBase.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning; + +import org.apache.hudi.common.table.HoodieTableMetaClient; + +public abstract class AbstractMigratorBase implements VersionMigrator { + + protected final HoodieTableMetaClient metaClient; + + public AbstractMigratorBase(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java new file mode 100644 index 000000000..e348084ca --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/MetadataMigrator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.collection.Pair; + +/** + * Migrates a specific metadata type stored in .hoodie folder to latest version + * @param + */ +public class MetadataMigrator { + + private final Map> migrators; + private final Integer latestVersion; + private final Integer oldestVersion; + + public MetadataMigrator(HoodieTableMetaClient metaClient, List> migratorList) { + migrators = migratorList.stream().map(m -> + Pair.of(m.getManagedVersion(), m)).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get(); + oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get(); + } + + /** + * Upgrade Metadata version to its latest + * @param metadata Metadata + * @param metadataVersion Current version of metadata + * @return Metadata conforming to the latest version of this metadata + */ + public T upgradeToLatest(T metadata, int metadataVersion) { + if (metadataVersion == latestVersion) { + return metadata; + } + + int newVersion = metadataVersion + 1; + while (newVersion <= latestVersion) { + VersionMigrator upgrader = migrators.get(newVersion); + metadata = upgrader.upgradeFrom(metadata); + newVersion += 1; + } + return metadata; + } + + /** + * Migrate metadata to a specific version + * @param metadata Hoodie Table Meta Client + * @param metadataVersion Metadata Version + * @param targetVersion Target Version + * @return Metadata conforming to the target version + */ + public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) { + Preconditions.checkArgument(targetVersion >= oldestVersion); + Preconditions.checkArgument(targetVersion <= latestVersion); + if (metadataVersion == targetVersion) { + return metadata; + } else if (metadataVersion > targetVersion) { + return dowgradeToVersion(metadata, metadataVersion, targetVersion); + } else { + return upgradeToVersion(metadata, metadataVersion, targetVersion); + } + } + + private T upgradeToVersion(T metadata, int metadataVersion, int targetVersion) { + int newVersion = metadataVersion + 1; + while (newVersion <= targetVersion) { + VersionMigrator upgrader = migrators.get(newVersion); + metadata = upgrader.upgradeFrom(metadata); + newVersion += 1; + } + return metadata; + } + + private T dowgradeToVersion(T metadata, int metadataVersion, int targetVersion) { + int newVersion = metadataVersion - 1; + while (newVersion >= targetVersion) { + VersionMigrator downgrader = migrators.get(newVersion); + metadata = downgrader.downgradeFrom(metadata); + newVersion -= 1; + } + return metadata; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java new file mode 100644 index 000000000..cac865055 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/VersionMigrator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning; + +import java.io.Serializable; + +/** + * Responsible for upgrading and downgrading metadata versions for a specific metadata + * @param Metadata Type + */ +public interface VersionMigrator extends Serializable { + + /** + * Version of Metadata that this class will handle + * @return + */ + Integer getManagedVersion(); + + /** + * Upgrades metadata of type T from previous version to this version + * @param input Metadata as of previous version. + * @return Metadata compatible with the version managed by this class + */ + T upgradeFrom(T input); + + /** + * Downgrades metadata of type T from next version to this version + * @param input Metadata as of next highest version + * @return Metadata compatible with the version managed by this class + */ + T downgradeFrom(T input); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java new file mode 100644 index 000000000..ae6525c40 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionPlanMigrator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.compaction; + +import java.util.Arrays; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.versioning.MetadataMigrator; + +/** + * Responsible for handling different versions of compaction plan + */ +public class CompactionPlanMigrator extends MetadataMigrator { + + public CompactionPlanMigrator(HoodieTableMetaClient metaClient) { + super(metaClient, Arrays.asList( + new CompactionV1MigrationHandler(metaClient), + new CompactionV2MigrationHandler(metaClient))); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java new file mode 100644 index 000000000..893b02562 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV1MigrationHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.compaction; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.common.versioning.AbstractMigratorBase; + +/** + * Compaction V1 has absolute paths as part of compaction operations + */ +public class CompactionV1MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 1; + + public CompactionV1MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) { + throw new IllegalArgumentException("This is the lowest version. Input cannot be any lower version"); + } + + @Override + public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) { + Preconditions.checkArgument(input.getVersion() == 2, "Input version is " + + input.getVersion() + ". Must be 2"); + HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); + final Path basePath = new Path(metaClient.getBasePath()); + List v1CompactionOperationList = new ArrayList<>(); + if (null != input.getOperations()) { + v1CompactionOperationList = input.getOperations().stream().map(inp -> { + return HoodieCompactionOperation.newBuilder() + .setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()) + .setPartitionPath(inp.getPartitionPath()) + .setMetrics(inp.getMetrics()) + .setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath())) + .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> convertToV1Path(basePath, + inp.getPartitionPath(), s)).collect(Collectors.toList())) + .build(); + }).collect(Collectors.toList()); + } + compactionPlan.setOperations(v1CompactionOperationList); + compactionPlan.setExtraMetadata(input.getExtraMetadata()); + compactionPlan.setVersion(getManagedVersion()); + return compactionPlan; + } + + private static String convertToV1Path(Path basePath, String partitionPath, String fileName) { + if ((fileName == null) || (fileName.isEmpty())) { + return fileName; + } + + return new Path(FSUtils.getPartitionPath(basePath, partitionPath), fileName).toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java new file mode 100644 index 000000000..7a5416afb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/versioning/compaction/CompactionV2MigrationHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.versioning.compaction; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.versioning.AbstractMigratorBase; + +/** + * With version 2 of compaction plan, paths are no longer absolute. + */ +public class CompactionV2MigrationHandler extends AbstractMigratorBase { + + public static final Integer VERSION = 2; + + public CompactionV2MigrationHandler(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + @Override + public Integer getManagedVersion() { + return VERSION; + } + + @Override + public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) { + Preconditions.checkArgument(input.getVersion() == 1, "Input version is " + + input.getVersion() + ". Must be 1"); + HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan(); + List v2CompactionOperationList = new ArrayList<>(); + if (null != input.getOperations()) { + v2CompactionOperationList = input.getOperations().stream().map(inp -> { + return HoodieCompactionOperation.newBuilder() + .setBaseInstantTime(inp.getBaseInstantTime()) + .setFileId(inp.getFileId()) + .setPartitionPath(inp.getPartitionPath()) + .setMetrics(inp.getMetrics()) + .setDataFilePath(new Path(inp.getDataFilePath()).getName()) + .setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()) + .collect(Collectors.toList())) + .build(); + }).collect(Collectors.toList()); + } + compactionPlan.setOperations(v2CompactionOperationList); + compactionPlan.setExtraMetadata(input.getExtraMetadata()); + compactionPlan.setVersion(getManagedVersion()); + return compactionPlan; + } + + @Override + public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) { + throw new IllegalArgumentException("This is the current highest version. Input cannot be any higher version"); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/CompactionTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/CompactionTestUtils.java index 56a40c29d..4dbf810f2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/CompactionTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/CompactionTestUtils.java @@ -171,7 +171,8 @@ public class CompactionTestUtils { throw new HoodieIOException(e.getMessage(), e); } }).collect(Collectors.toList()); - return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>()); + return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>(), + CompactionUtils.LATEST_COMPACTION_METADATA_VERSION); } public static class TestHoodieDataFile extends HoodieDataFile { @@ -179,7 +180,7 @@ public class CompactionTestUtils { private final String path; public TestHoodieDataFile(String path) { - super("/tmp/ce481ee7-9e53-4a2e-9992-f9e295fa79c0_11_20180918020003.parquet"); + super(path); this.path = path; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 3bb46fbcc..71373409a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -22,6 +22,8 @@ import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PAT import static org.apache.hudi.common.util.CompactionTestUtils.createCompactionPlan; import static org.apache.hudi.common.util.CompactionTestUtils.scheduleCompaction; import static org.apache.hudi.common.util.CompactionTestUtils.setupAndValidateCompactionOperations; +import static org.apache.hudi.common.util.CompactionUtils.COMPACTION_METADATA_VERSION_1; +import static org.apache.hudi.common.util.CompactionUtils.LATEST_COMPACTION_METADATA_VERSION; import com.google.common.collect.ImmutableMap; import java.io.IOException; @@ -36,12 +38,14 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieDataFile; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -59,20 +63,39 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { initMetaClient(); } + @Test + public void testUpgradeDowngrade() { + Pair>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan(); + testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), inputAndPlan.getValue()); + + CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient); + HoodieCompactionPlan plan = inputAndPlan.getRight(); + System.out.println("Plan=" + plan.getOperations()); + Assert.assertEquals(LATEST_COMPACTION_METADATA_VERSION, plan.getVersion()); + HoodieCompactionPlan oldPlan = migrator.migrateToVersion(plan, plan.getVersion(), COMPACTION_METADATA_VERSION_1); + // Check with older version of compaction plan + Assert.assertEquals(COMPACTION_METADATA_VERSION_1, oldPlan.getVersion()); + testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), oldPlan); + HoodieCompactionPlan newPlan = migrator.upgradeToLatest(plan, plan.getVersion()); + Assert.assertEquals(LATEST_COMPACTION_METADATA_VERSION, newPlan.getVersion()); + testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), newPlan); + } + @Test public void testBuildFromFileSlice() { // Empty File-Slice with no data and log files FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]); + testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0], + LATEST_COMPACTION_METADATA_VERSION); // File Slice with data-file but no log files FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]); - + testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0], + LATEST_COMPACTION_METADATA_VERSION); // File Slice with no data-file but log files present FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); noDataFileSlice.addLogFile( @@ -80,7 +103,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { noDataFileSlice.addLogFile( new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]); + testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0], + LATEST_COMPACTION_METADATA_VERSION); // File Slice with data-file and log files present FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); @@ -90,27 +114,29 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { fileSlice.addLogFile( new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn)); - testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]); + testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0], + LATEST_COMPACTION_METADATA_VERSION); } /** * Generate input for compaction plan tests */ private Pair>, HoodieCompactionPlan> buildCompactionPlan() { + Path fullPartitionPath = new Path(new Path(metaClient.getBasePath()), DEFAULT_PARTITION_PATHS[0]); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); - fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); + fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet")); fileSlice.addLogFile( - new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))); + new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); fileSlice.addLogFile( - new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); + new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); - noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); + noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet")); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))); + new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); noDataFileSlice.addLogFile( - new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); + new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); List fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); List> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList()); @@ -195,8 +221,9 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { */ private void testFileSlicesCompactionPlanEquality(List> input, HoodieCompactionPlan plan) { Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); - IntStream.range(0, input.size()).boxed().forEach(idx -> testFileSliceCompactionOpEquality(input.get(idx).getValue(), - plan.getOperations().get(idx), input.get(idx).getKey())); + IntStream.range(0, input.size()).boxed().forEach(idx -> + testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx), + input.get(idx).getKey(), plan.getVersion())); } /** @@ -207,16 +234,20 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { * @param expPartitionPath Partition path */ private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, - String expPartitionPath) { + String expPartitionPath, int version) { Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); if (slice.getDataFile().isPresent()) { - Assert.assertEquals("Same data-file", slice.getDataFile().get().getPath(), op.getDataFilePath()); + HoodieDataFile df = slice.getDataFile().get(); + Assert.assertEquals("Same data-file", + version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), op.getDataFilePath()); } List paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); IntStream.range(0, paths.size()).boxed().forEach(idx -> { - Assert.assertEquals("Log File Index " + idx, paths.get(idx), op.getDeltaFilePaths().get(idx)); + Assert.assertEquals("Log File Index " + idx, + version == COMPACTION_METADATA_VERSION_1 ? paths.get(idx) : new Path(paths.get(idx)).getName(), + op.getDeltaFilePaths().get(idx)); }); Assert.assertEquals("Metrics set", metrics, op.getMetrics()); }