1
0

[HUDI-130] Paths written in compaction plan needs to be relative to base-path

This commit is contained in:
Balaji Varadarajan
2019-10-23 01:03:29 -07:00
committed by Balaji Varadarajan
parent e4c91ed13f
commit d8be818ac9
25 changed files with 600 additions and 104 deletions

View File

@@ -290,9 +290,10 @@ public class CompactionCommand implements CommandMarker {
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> { res.stream().forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), Comparable[] row = new Comparable[]{r.getOperation().getFileId(),
r.getOperation().getDataFilePath().isPresent() ? r.getOperation().getDataFilePath().get() : "", r.getOperation().getBaseInstantTime(),
r.getOperation().getDeltaFilePaths().size(), r.isSuccess(), r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
r.getException().isPresent() ? r.getException().get().getMessage() : ""}; r.getException().isPresent() ? r.getException().get().getMessage() : ""};
rows.add(row); rows.add(row);
}); });

View File

@@ -239,8 +239,9 @@ public class CompactionAdminClient extends AbstractHoodieClient {
FileSlice merged = FileSlice merged =
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get();
final int maxVersion = op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) final int maxVersion =
.reduce((x, y) -> x > y ? x : y).orElse(0); op.getDeltaFileNames().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf)))
.reduce((x, y) -> x > y ? x : y).orElse(0);
List<HoodieLogFile> logFilesToBeMoved = List<HoodieLogFile> logFilesToBeMoved =
merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> { return logFilesToBeMoved.stream().map(lf -> {
@@ -291,28 +292,34 @@ public class CompactionAdminClient extends AbstractHoodieClient {
if (fileSliceOptional.isPresent()) { if (fileSliceOptional.isPresent()) {
FileSlice fs = fileSliceOptional.get(); FileSlice fs = fileSliceOptional.get();
Option<HoodieDataFile> df = fs.getDataFile(); Option<HoodieDataFile> df = fs.getDataFile();
if (operation.getDataFilePath().isPresent()) { if (operation.getDataFileName().isPresent()) {
String expPath = String expPath = metaClient.getFs().getFileStatus(new Path(
metaClient.getFs().getFileStatus(new Path(operation.getDataFilePath().get())).getPath().toString(); FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()),
Preconditions.checkArgument(df.isPresent(), new Path(operation.getDataFileName().get()))).getPath()
"Data File must be present. File Slice was : " + fs + ", operation :" + operation); .toString();
Preconditions.checkArgument(df.isPresent(), "Data File must be present. File Slice was : "
+ fs + ", operation :" + operation);
Preconditions.checkArgument(df.get().getPath().equals(expPath), Preconditions.checkArgument(df.get().getPath().equals(expPath),
"Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath()); "Base Path in operation is specified as " + expPath + " but got path " + df.get().getPath());
} }
Set<HoodieLogFile> logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet()); Set<HoodieLogFile> logFilesInFileSlice = fs.getLogFiles().collect(Collectors.toSet());
Set<HoodieLogFile> logFilesInCompactionOp = operation.getDeltaFilePaths().stream().map(dp -> { Set<HoodieLogFile> logFilesInCompactionOp = operation.getDeltaFileNames().stream()
try { .map(dp -> {
FileStatus[] fileStatuses = metaClient.getFs().listStatus(new Path(dp)); try {
Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status"); FileStatus[] fileStatuses = metaClient.getFs().listStatus(
return new HoodieLogFile(fileStatuses[0]); new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()),
} catch (FileNotFoundException fe) { new Path(dp)));
throw new CompactionValidationException(fe.getMessage()); Preconditions.checkArgument(fileStatuses.length == 1, "Expect only 1 file-status");
} catch (IOException ioe) { return new HoodieLogFile(fileStatuses[0]);
throw new HoodieIOException(ioe.getMessage(), ioe); } catch (FileNotFoundException fe) {
} throw new CompactionValidationException(fe.getMessage());
}).collect(Collectors.toSet()); } catch (IOException ioe) {
Set<HoodieLogFile> missing = logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf)) throw new HoodieIOException(ioe.getMessage(), ioe);
.collect(Collectors.toSet()); }
}).collect(Collectors.toSet());
Set<HoodieLogFile> missing =
logFilesInCompactionOp.stream().filter(lf -> !logFilesInFileSlice.contains(lf))
.collect(Collectors.toSet());
Preconditions.checkArgument(missing.isEmpty(), Preconditions.checkArgument(missing.isEmpty(),
"All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :" "All log files specified in compaction operation is not present. Missing :" + missing + ", Exp :"
+ logFilesInCompactionOp + ", Got :" + logFilesInFileSlice); + logFilesInCompactionOp + ", Got :" + logFilesInFileSlice);

View File

@@ -57,6 +57,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
@@ -1176,13 +1177,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
*/ */
private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline,
boolean autoCommit) throws IOException { boolean autoCommit) throws IOException {
HoodieCompactionPlan compactionPlan = HoodieTableMetaClient metaClient = createMetaClient(true);
AvroUtils.deserializeCompactionPlan(activeTimeline.getInstantAuxiliaryDetails(compactionInstant).get()); HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient,
compactionInstant.getTimestamp());
// Mark instant as compaction inflight // Mark instant as compaction inflight
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx(); compactionTimer = metrics.getCompactionCtx();
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan);
// Force compaction action // Force compaction action

View File

@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -98,10 +99,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException {
FileSystem fs = metaClient.getFs(); FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation.getDeltaFilePaths() Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+ " for commit " + commitTime); log.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation
.getDeltaFileNames() + " for commit " + commitTime);
// TODO - FIX THIS // TODO - FIX THIS
// Reads the entire avro file. Always only specific blocks should be read from the avro file // Reads the entire avro file. Always only specific blocks should be read from the avro file
// (failure recover). // (failure recover).
@@ -113,15 +114,21 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(),
operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), List<String> logFiles = operation.getDeltaFileNames().stream()
config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), .map(p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()),
config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); p).toString()).collect(toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs,
metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime,
config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
config.getSpillableMapBasePath());
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return Lists.<WriteStatus>newArrayList(); return Lists.<WriteStatus>newArrayList();
} }
Option<HoodieDataFile> oldDataFileOpt = operation.getBaseFile(); Option<HoodieDataFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(),
operation.getPartitionPath());
// Compacting is very similar to applying updates to existing file // Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result; Iterator<List<WriteStatus>> result;
@@ -182,22 +189,28 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size()) List<HoodieCompactionOperation> operations =
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView jsc.parallelize(partitionPaths, partitionPaths.size())
.getLatestFileSlices(partitionPath) .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { .getLatestFileSlices(partitionPath)
List<HoodieLogFile> logFiles = .filter(slice ->
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); !fgIdsInPendingCompactions.contains(slice.getFileGroupId()))
totalLogFiles.add((long) logFiles.size()); .map(
totalFileSlices.add(1L); s -> {
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
// for spark Map operations and collecting them finally in Avro generated classes for storing .getLogFileComparator()).collect(Collectors.toList());
// into meta files. totalLogFiles.add((long) logFiles.size());
Option<HoodieDataFile> dataFile = s.getDataFile(); totalFileSlices.add(1L);
return new CompactionOperation(dataFile, partitionPath, logFiles, // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); // for spark Map operations and collecting them finally in Avro generated classes for storing
}).filter(c -> !c.getDeltaFilePaths().isEmpty()).collect(toList()).iterator()) // into meta files.
.collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); Option<HoodieDataFile> dataFile = s.getDataFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
})
.filter(c -> !c.getDeltaFileNames().isEmpty())
.collect(toList()).iterator()).collect().stream().map(CompactionUtils::buildHoodieCompactionOperation)
.collect(toList());
log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total of " + operations.size() + " compactions are retrieved");
log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of latest files slices " + totalFileSlices.value());
log.info("Total number of log files " + totalLogFiles.value()); log.info("Total number of log files " + totalLogFiles.value());

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -94,7 +95,9 @@ public abstract class CompactionStrategy implements Serializable {
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
// Strategy implementation can overload this method to set specific compactor-id // Strategy implementation can overload this method to set specific compactor-id
return HoodieCompactionPlan.newBuilder() return HoodieCompactionPlan.newBuilder()
.setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)).build(); .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans))
.setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION)
.build();
} }
/** /**

View File

@@ -67,11 +67,13 @@
<imports> <imports>
<!-- import avro files --> <!-- import avro files -->
<import>${basedir}/src/main/avro/HoodieCommitMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieCommitMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCompactionOperation.avsc</import>
<import>${basedir}/src/main/avro/HoodieSavePointMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieSavePointMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCompactionMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieCompactionMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieCleanMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieCleanMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import> <import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
</imports> </imports>
</configuration> </configuration>
</plugin> </plugin>

View File

@@ -67,6 +67,11 @@
{ {
"name":"actionType", "name":"actionType",
"type":["null","string"] "type":["null","string"]
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -36,6 +36,11 @@
] ]
} }
} }
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -126,6 +126,11 @@
"type":"map", "type":"map",
"values":"string" "values":"string"
}] }]
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -74,6 +74,11 @@
"values":"string" "values":"string"
}], }],
"default": null "default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -29,6 +29,11 @@
"items": "HoodieRollbackMetadata", "items": "HoodieRollbackMetadata",
"name": "hoodieRollbackMetadata" "name": "hoodieRollbackMetadata"
} }
}} }},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
] ]
} }

View File

@@ -34,6 +34,11 @@
] ]
} }
} }
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -32,6 +32,11 @@
] ]
} }
} }
},
{
"name":"version",
"type":["int", "null"],
"default": 1
} }
] ]
} }

View File

@@ -39,8 +39,8 @@ public class CompactionOperation implements Serializable {
private String baseInstantTime; private String baseInstantTime;
private Option<String> dataFileCommitTime; private Option<String> dataFileCommitTime;
private List<String> deltaFilePaths; private List<String> deltaFileNames;
private Option<String> dataFilePath; private Option<String> dataFileName;
private HoodieFileGroupId id; private HoodieFileGroupId id;
private Map<String, Double> metrics; private Map<String, Double> metrics;
@@ -49,12 +49,12 @@ public class CompactionOperation implements Serializable {
public CompactionOperation() {} public CompactionOperation() {}
public CompactionOperation(String fileId, String partitionPath, String baseInstantTime, public CompactionOperation(String fileId, String partitionPath, String baseInstantTime,
Option<String> dataFileCommitTime, List<String> deltaFilePaths, Option<String> dataFilePath, Option<String> dataFileCommitTime, List<String> deltaFileNames, Option<String> dataFileName,
Map<String, Double> metrics) { Map<String, Double> metrics) {
this.baseInstantTime = baseInstantTime; this.baseInstantTime = baseInstantTime;
this.dataFileCommitTime = dataFileCommitTime; this.dataFileCommitTime = dataFileCommitTime;
this.deltaFilePaths = deltaFilePaths; this.deltaFileNames = deltaFileNames;
this.dataFilePath = dataFilePath; this.dataFileName = dataFileName;
this.id = new HoodieFileGroupId(partitionPath, fileId); this.id = new HoodieFileGroupId(partitionPath, fileId);
this.metrics = metrics; this.metrics = metrics;
} }
@@ -63,18 +63,18 @@ public class CompactionOperation implements Serializable {
Map<String, Double> metrics) { Map<String, Double> metrics) {
if (dataFile.isPresent()) { if (dataFile.isPresent()) {
this.baseInstantTime = dataFile.get().getCommitTime(); this.baseInstantTime = dataFile.get().getCommitTime();
this.dataFilePath = Option.of(dataFile.get().getPath()); this.dataFileName = Option.of(dataFile.get().getFileName());
this.id = new HoodieFileGroupId(partitionPath, dataFile.get().getFileId()); this.id = new HoodieFileGroupId(partitionPath, dataFile.get().getFileId());
this.dataFileCommitTime = Option.of(dataFile.get().getCommitTime()); this.dataFileCommitTime = Option.of(dataFile.get().getCommitTime());
} else { } else {
assert logFiles.size() > 0; assert logFiles.size() > 0;
this.dataFilePath = Option.empty(); this.dataFileName = Option.empty();
this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath())); this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath()));
this.dataFileCommitTime = Option.empty(); this.dataFileCommitTime = Option.empty();
} }
this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()); this.deltaFileNames = logFiles.stream().map(s -> s.getPath().getName()).collect(Collectors.toList());
this.metrics = metrics; this.metrics = metrics;
} }
@@ -86,12 +86,12 @@ public class CompactionOperation implements Serializable {
return dataFileCommitTime; return dataFileCommitTime;
} }
public List<String> getDeltaFilePaths() { public List<String> getDeltaFileNames() {
return deltaFilePaths; return deltaFileNames;
} }
public Option<String> getDataFilePath() { public Option<String> getDataFileName() {
return dataFilePath; return dataFileName;
} }
public String getFileId() { public String getFileId() {
@@ -110,9 +110,9 @@ public class CompactionOperation implements Serializable {
return id; return id;
} }
public Option<HoodieDataFile> getBaseFile() { public Option<HoodieDataFile> getBaseFile(String basePath, String partitionPath) {
// TODO: HUDI-130 - Paths return in compaction plan needs to be relative to base-path Path dirPath = FSUtils.getPartitionPath(basePath, partitionPath);
return dataFilePath.map(df -> new HoodieDataFile(df)); return dataFileName.map(df -> new HoodieDataFile(new Path(dirPath, df).toString()));
} }
/** /**
@@ -124,9 +124,9 @@ public class CompactionOperation implements Serializable {
public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) { public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) {
CompactionOperation op = new CompactionOperation(); CompactionOperation op = new CompactionOperation();
op.baseInstantTime = operation.getBaseInstantTime(); op.baseInstantTime = operation.getBaseInstantTime();
op.dataFilePath = Option.ofNullable(operation.getDataFilePath()); op.dataFileName = Option.ofNullable(operation.getDataFilePath());
op.dataFileCommitTime = op.dataFilePath.map(p -> FSUtils.getCommitTime(new Path(p).getName())); op.dataFileCommitTime = op.dataFileName.map(p -> FSUtils.getCommitTime(new Path(p).getName()));
op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths()); op.deltaFileNames = new ArrayList<>(operation.getDeltaFilePaths());
op.id = new HoodieFileGroupId(operation.getPartitionPath(), operation.getFileId()); op.id = new HoodieFileGroupId(operation.getPartitionPath(), operation.getFileId());
op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics()); op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics());
return op; return op;
@@ -134,9 +134,14 @@ public class CompactionOperation implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime=" return "CompactionOperation{"
+ dataFileCommitTime + ", deltaFilePaths=" + deltaFilePaths + ", dataFilePath=" + dataFilePath + ", id='" + id + "baseInstantTime='" + baseInstantTime + '\''
+ '\'' + ", metrics=" + metrics + '}'; + ", dataFileCommitTime=" + dataFileCommitTime
+ ", deltaFileNames=" + deltaFileNames
+ ", dataFileName=" + dataFileName
+ ", id='" + id + '\''
+ ", metrics=" + metrics
+ '}';
} }
@Override @Override
@@ -150,8 +155,9 @@ public class CompactionOperation implements Serializable {
CompactionOperation operation = (CompactionOperation) o; CompactionOperation operation = (CompactionOperation) o;
return Objects.equals(baseInstantTime, operation.baseInstantTime) return Objects.equals(baseInstantTime, operation.baseInstantTime)
&& Objects.equals(dataFileCommitTime, operation.dataFileCommitTime) && Objects.equals(dataFileCommitTime, operation.dataFileCommitTime)
&& Objects.equals(deltaFilePaths, operation.deltaFilePaths) && Objects.equals(deltaFileNames, operation.deltaFileNames)
&& Objects.equals(dataFilePath, operation.dataFilePath) && Objects.equals(id, operation.id); && Objects.equals(dataFileName, operation.dataFileName)
&& Objects.equals(id, operation.id);
} }
@Override @Override

View File

@@ -61,8 +61,8 @@ public class CompactionOpDTO {
dto.compactionInstantTime = compactionInstantTime; dto.compactionInstantTime = compactionInstantTime;
dto.baseInstantTime = op.getBaseInstantTime(); dto.baseInstantTime = op.getBaseInstantTime();
dto.dataFileCommitTime = op.getDataFileCommitTime().orElse(null); dto.dataFileCommitTime = op.getDataFileCommitTime().orElse(null);
dto.dataFilePath = op.getDataFilePath().orElse(null); dto.dataFilePath = op.getDataFileName().orElse(null);
dto.deltaFilePaths = new ArrayList<>(op.getDeltaFilePaths()); dto.deltaFilePaths = new ArrayList<>(op.getDeltaFileNames());
dto.partitionPath = op.getPartitionPath(); dto.partitionPath = op.getPartitionPath();
dto.metrics = op.getMetrics() == null ? new HashMap<>() : new HashMap<>(op.getMetrics()); dto.metrics = op.getMetrics() == null ? new HashMap<>() : new HashMap<>(op.getMetrics());
return dto; return dto;

View File

@@ -47,6 +47,8 @@ import org.apache.hudi.common.HoodieRollbackStat;
public class AvroUtils { public class AvroUtils {
private static final Integer DEFAULT_VERSION = 1;
public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Option<Long> durationInMs, public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, Option<Long> durationInMs,
List<HoodieCleanStat> cleanStats) { List<HoodieCleanStat> cleanStats) {
ImmutableMap.Builder<String, HoodieCleanPartitionMetadata> partitionMetadataBuilder = ImmutableMap.builder(); ImmutableMap.Builder<String, HoodieCleanPartitionMetadata> partitionMetadataBuilder = ImmutableMap.builder();
@@ -64,7 +66,7 @@ public class AvroUtils {
} }
} }
return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted,
earliestCommitToRetain, partitionMetadataBuilder.build()); earliestCommitToRetain, partitionMetadataBuilder.build(), DEFAULT_VERSION);
} }
public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option<Long> durationInMs, public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option<Long> durationInMs,
@@ -75,7 +77,7 @@ public class AvroUtils {
Arrays.asList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); Arrays.asList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue())));
} }
return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits,
commitToStatBuilder.build()); commitToStatBuilder.build(), DEFAULT_VERSION);
} }
public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option<Long> durationInMs, public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option<Long> durationInMs,
@@ -88,8 +90,9 @@ public class AvroUtils {
partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); partitionMetadataBuilder.put(stat.getPartitionPath(), metadata);
totalDeleted += stat.getSuccessDeleteFiles().size(); totalDeleted += stat.getSuccessDeleteFiles().size();
} }
return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), totalDeleted, commits, return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), totalDeleted, commits,
partitionMetadataBuilder.build()); partitionMetadataBuilder.build(), DEFAULT_VERSION);
} }
public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment, public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment,
@@ -99,7 +102,8 @@ public class AvroUtils {
HoodieSavepointPartitionMetadata metadata = new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue()); HoodieSavepointPartitionMetadata metadata = new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue());
partitionMetadataBuilder.put(stat.getKey(), metadata); partitionMetadataBuilder.put(stat.getKey(), metadata);
} }
return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, partitionMetadataBuilder.build()); return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, partitionMetadataBuilder.build(),
DEFAULT_VERSION);
} }
public static Option<byte[]> serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException { public static Option<byte[]> serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException {

View File

@@ -34,6 +34,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator;
import org.apache.hudi.common.versioning.compaction.CompactionV1MigrationHandler;
import org.apache.hudi.common.versioning.compaction.CompactionV2MigrationHandler;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -45,6 +48,10 @@ public class CompactionUtils {
private static final Logger LOG = LogManager.getLogger(CompactionUtils.class); private static final Logger LOG = LogManager.getLogger(CompactionUtils.class);
public static final Integer COMPACTION_METADATA_VERSION_1 = CompactionV1MigrationHandler.VERSION;
public static final Integer COMPACTION_METADATA_VERSION_2 = CompactionV2MigrationHandler.VERSION;
public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2;
/** /**
* Generate compaction operation from file-slice * Generate compaction operation from file-slice
* *
@@ -59,9 +66,9 @@ public class CompactionUtils {
builder.setPartitionPath(partitionPath); builder.setPartitionPath(partitionPath);
builder.setFileId(fileSlice.getFileId()); builder.setFileId(fileSlice.getFileId());
builder.setBaseInstantTime(fileSlice.getBaseInstantTime()); builder.setBaseInstantTime(fileSlice.getBaseInstantTime());
builder.setDeltaFilePaths(fileSlice.getLogFiles().map(lf -> lf.getPath().toString()).collect(Collectors.toList())); builder.setDeltaFilePaths(fileSlice.getLogFiles().map(lf -> lf.getPath().getName()).collect(Collectors.toList()));
if (fileSlice.getDataFile().isPresent()) { if (fileSlice.getDataFile().isPresent()) {
builder.setDataFilePath(fileSlice.getDataFile().get().getPath()); builder.setDataFilePath(fileSlice.getDataFile().get().getFileName());
} }
if (metricsCaptureFunction.isPresent()) { if (metricsCaptureFunction.isPresent()) {
@@ -82,9 +89,11 @@ public class CompactionUtils {
Option<Function<Pair<String, FileSlice>, Map<String, Double>>> metricsCaptureFunction) { Option<Function<Pair<String, FileSlice>, Map<String, Double>>> metricsCaptureFunction) {
HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder();
extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); extraMetadata.ifPresent(m -> builder.setExtraMetadata(m));
builder.setOperations(partitionFileSlicePairs.stream() builder.setOperations(partitionFileSlicePairs.stream()
.map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)) .map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction))
.collect(Collectors.toList())); .collect(Collectors.toList()));
builder.setVersion(LATEST_COMPACTION_METADATA_VERSION);
return builder.build(); return builder.build();
} }
@@ -94,8 +103,8 @@ public class CompactionUtils {
public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) { public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) {
return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()).setBaseInstantTime(op.getBaseInstantTime()) return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()).setBaseInstantTime(op.getBaseInstantTime())
.setPartitionPath(op.getPartitionPath()) .setPartitionPath(op.getPartitionPath())
.setDataFilePath(op.getDataFilePath().isPresent() ? op.getDataFilePath().get() : null) .setDataFilePath(op.getDataFileName().isPresent() ? op.getDataFileName().get() : null)
.setDeltaFilePaths(op.getDeltaFilePaths()).setMetrics(op.getMetrics()).build(); .setDeltaFilePaths(op.getDeltaFileNames()).setMetrics(op.getMetrics()).build();
} }
/** /**
@@ -127,9 +136,10 @@ public class CompactionUtils {
public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException { throws IOException {
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(metaClient.getActiveTimeline() HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(metaClient.getActiveTimeline()
.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); .getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return compactionPlan; return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion());
} }
/** /**

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning;
import org.apache.hudi.common.table.HoodieTableMetaClient;
public abstract class AbstractMigratorBase<T> implements VersionMigrator<T> {
protected final HoodieTableMetaClient metaClient;
public AbstractMigratorBase(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
}
}

View File

@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
/**
* Migrates a specific metadata type stored in .hoodie folder to latest version
* @param <T>
*/
public class MetadataMigrator<T> {
private final Map<Integer, VersionMigrator<T>> migrators;
private final Integer latestVersion;
private final Integer oldestVersion;
public MetadataMigrator(HoodieTableMetaClient metaClient, List<VersionMigrator<T>> migratorList) {
migrators = migratorList.stream().map(m ->
Pair.of(m.getManagedVersion(), m)).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
latestVersion = migrators.keySet().stream().reduce((x, y) -> x > y ? x : y).get();
oldestVersion = migrators.keySet().stream().reduce((x, y) -> x < y ? x : y).get();
}
/**
* Upgrade Metadata version to its latest
* @param metadata Metadata
* @param metadataVersion Current version of metadata
* @return Metadata conforming to the latest version of this metadata
*/
public T upgradeToLatest(T metadata, int metadataVersion) {
if (metadataVersion == latestVersion) {
return metadata;
}
int newVersion = metadataVersion + 1;
while (newVersion <= latestVersion) {
VersionMigrator<T> upgrader = migrators.get(newVersion);
metadata = upgrader.upgradeFrom(metadata);
newVersion += 1;
}
return metadata;
}
/**
* Migrate metadata to a specific version
* @param metadata Hoodie Table Meta Client
* @param metadataVersion Metadata Version
* @param targetVersion Target Version
* @return Metadata conforming to the target version
*/
public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) {
Preconditions.checkArgument(targetVersion >= oldestVersion);
Preconditions.checkArgument(targetVersion <= latestVersion);
if (metadataVersion == targetVersion) {
return metadata;
} else if (metadataVersion > targetVersion) {
return dowgradeToVersion(metadata, metadataVersion, targetVersion);
} else {
return upgradeToVersion(metadata, metadataVersion, targetVersion);
}
}
private T upgradeToVersion(T metadata, int metadataVersion, int targetVersion) {
int newVersion = metadataVersion + 1;
while (newVersion <= targetVersion) {
VersionMigrator<T> upgrader = migrators.get(newVersion);
metadata = upgrader.upgradeFrom(metadata);
newVersion += 1;
}
return metadata;
}
private T dowgradeToVersion(T metadata, int metadataVersion, int targetVersion) {
int newVersion = metadataVersion - 1;
while (newVersion >= targetVersion) {
VersionMigrator<T> downgrader = migrators.get(newVersion);
metadata = downgrader.downgradeFrom(metadata);
newVersion -= 1;
}
return metadata;
}
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning;
import java.io.Serializable;
/**
* Responsible for upgrading and downgrading metadata versions for a specific metadata
* @param <T> Metadata Type
*/
public interface VersionMigrator<T> extends Serializable {
/**
* Version of Metadata that this class will handle
* @return
*/
Integer getManagedVersion();
/**
* Upgrades metadata of type T from previous version to this version
* @param input Metadata as of previous version.
* @return Metadata compatible with the version managed by this class
*/
T upgradeFrom(T input);
/**
* Downgrades metadata of type T from next version to this version
* @param input Metadata as of next highest version
* @return Metadata compatible with the version managed by this class
*/
T downgradeFrom(T input);
}

View File

@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning.compaction;
import java.util.Arrays;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.versioning.MetadataMigrator;
/**
* Responsible for handling different versions of compaction plan
*/
public class CompactionPlanMigrator extends MetadataMigrator<HoodieCompactionPlan> {
public CompactionPlanMigrator(HoodieTableMetaClient metaClient) {
super(metaClient, Arrays.asList(
new CompactionV1MigrationHandler(metaClient),
new CompactionV2MigrationHandler(metaClient)));
}
}

View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning.compaction;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.versioning.AbstractMigratorBase;
/**
* Compaction V1 has absolute paths as part of compaction operations
*/
public class CompactionV1MigrationHandler extends AbstractMigratorBase<HoodieCompactionPlan> {
public static final Integer VERSION = 1;
public CompactionV1MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient);
}
@Override
public Integer getManagedVersion() {
return VERSION;
}
@Override
public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) {
throw new IllegalArgumentException("This is the lowest version. Input cannot be any lower version");
}
@Override
public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) {
Preconditions.checkArgument(input.getVersion() == 2, "Input version is "
+ input.getVersion() + ". Must be 2");
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
final Path basePath = new Path(metaClient.getBasePath());
List<HoodieCompactionOperation> v1CompactionOperationList = new ArrayList<>();
if (null != input.getOperations()) {
v1CompactionOperationList = input.getOperations().stream().map(inp -> {
return HoodieCompactionOperation.newBuilder()
.setBaseInstantTime(inp.getBaseInstantTime())
.setFileId(inp.getFileId())
.setPartitionPath(inp.getPartitionPath())
.setMetrics(inp.getMetrics())
.setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath()))
.setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> convertToV1Path(basePath,
inp.getPartitionPath(), s)).collect(Collectors.toList()))
.build();
}).collect(Collectors.toList());
}
compactionPlan.setOperations(v1CompactionOperationList);
compactionPlan.setExtraMetadata(input.getExtraMetadata());
compactionPlan.setVersion(getManagedVersion());
return compactionPlan;
}
private static String convertToV1Path(Path basePath, String partitionPath, String fileName) {
if ((fileName == null) || (fileName.isEmpty())) {
return fileName;
}
return new Path(FSUtils.getPartitionPath(basePath, partitionPath), fileName).toString();
}
}

View File

@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.versioning.compaction;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.versioning.AbstractMigratorBase;
/**
* With version 2 of compaction plan, paths are no longer absolute.
*/
public class CompactionV2MigrationHandler extends AbstractMigratorBase<HoodieCompactionPlan> {
public static final Integer VERSION = 2;
public CompactionV2MigrationHandler(HoodieTableMetaClient metaClient) {
super(metaClient);
}
@Override
public Integer getManagedVersion() {
return VERSION;
}
@Override
public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) {
Preconditions.checkArgument(input.getVersion() == 1, "Input version is "
+ input.getVersion() + ". Must be 1");
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
List<HoodieCompactionOperation> v2CompactionOperationList = new ArrayList<>();
if (null != input.getOperations()) {
v2CompactionOperationList = input.getOperations().stream().map(inp -> {
return HoodieCompactionOperation.newBuilder()
.setBaseInstantTime(inp.getBaseInstantTime())
.setFileId(inp.getFileId())
.setPartitionPath(inp.getPartitionPath())
.setMetrics(inp.getMetrics())
.setDataFilePath(new Path(inp.getDataFilePath()).getName())
.setDeltaFilePaths(inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName())
.collect(Collectors.toList()))
.build();
}).collect(Collectors.toList());
}
compactionPlan.setOperations(v2CompactionOperationList);
compactionPlan.setExtraMetadata(input.getExtraMetadata());
compactionPlan.setVersion(getManagedVersion());
return compactionPlan;
}
@Override
public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) {
throw new IllegalArgumentException("This is the current highest version. Input cannot be any higher version");
}
}

View File

@@ -171,7 +171,8 @@ public class CompactionTestUtils {
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} }
}).collect(Collectors.toList()); }).collect(Collectors.toList());
return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>()); return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>(),
CompactionUtils.LATEST_COMPACTION_METADATA_VERSION);
} }
public static class TestHoodieDataFile extends HoodieDataFile { public static class TestHoodieDataFile extends HoodieDataFile {
@@ -179,7 +180,7 @@ public class CompactionTestUtils {
private final String path; private final String path;
public TestHoodieDataFile(String path) { public TestHoodieDataFile(String path) {
super("/tmp/ce481ee7-9e53-4a2e-9992-f9e295fa79c0_11_20180918020003.parquet"); super(path);
this.path = path; this.path = path;
} }

View File

@@ -22,6 +22,8 @@ import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PAT
import static org.apache.hudi.common.util.CompactionTestUtils.createCompactionPlan; import static org.apache.hudi.common.util.CompactionTestUtils.createCompactionPlan;
import static org.apache.hudi.common.util.CompactionTestUtils.scheduleCompaction; import static org.apache.hudi.common.util.CompactionTestUtils.scheduleCompaction;
import static org.apache.hudi.common.util.CompactionTestUtils.setupAndValidateCompactionOperations; import static org.apache.hudi.common.util.CompactionTestUtils.setupAndValidateCompactionOperations;
import static org.apache.hudi.common.util.CompactionUtils.COMPACTION_METADATA_VERSION_1;
import static org.apache.hudi.common.util.CompactionUtils.LATEST_COMPACTION_METADATA_VERSION;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import java.io.IOException; import java.io.IOException;
@@ -36,12 +38,14 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieDataFile; import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieDataFile;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.versioning.compaction.CompactionPlanMigrator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@@ -59,20 +63,39 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
initMetaClient(); initMetaClient();
} }
@Test
public void testUpgradeDowngrade() {
Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan();
testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), inputAndPlan.getValue());
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
HoodieCompactionPlan plan = inputAndPlan.getRight();
System.out.println("Plan=" + plan.getOperations());
Assert.assertEquals(LATEST_COMPACTION_METADATA_VERSION, plan.getVersion());
HoodieCompactionPlan oldPlan = migrator.migrateToVersion(plan, plan.getVersion(), COMPACTION_METADATA_VERSION_1);
// Check with older version of compaction plan
Assert.assertEquals(COMPACTION_METADATA_VERSION_1, oldPlan.getVersion());
testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), oldPlan);
HoodieCompactionPlan newPlan = migrator.upgradeToLatest(plan, plan.getVersion());
Assert.assertEquals(LATEST_COMPACTION_METADATA_VERSION, newPlan.getVersion());
testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), newPlan);
}
@Test @Test
public void testBuildFromFileSlice() { public void testBuildFromFileSlice() {
// Empty File-Slice with no data and log files // Empty File-Slice with no data and log files
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
HoodieCompactionOperation op = HoodieCompactionOperation op =
CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Option.of(metricsCaptureFn)); CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0],
LATEST_COMPACTION_METADATA_VERSION);
// File Slice with data-file but no log files // File Slice with data-file but no log files
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn)); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0],
LATEST_COMPACTION_METADATA_VERSION);
// File Slice with no data-file but log files present // File Slice with no data-file but log files present
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(
@@ -80,7 +103,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Option.of(metricsCaptureFn)); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0],
LATEST_COMPACTION_METADATA_VERSION);
// File Slice with data-file and log files present // File Slice with data-file and log files present
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
@@ -90,27 +114,29 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
fileSlice.addLogFile( fileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn)); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], fileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]); testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0],
LATEST_COMPACTION_METADATA_VERSION);
} }
/** /**
* Generate input for compaction plan tests * Generate input for compaction plan tests
*/ */
private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() { private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() {
Path fullPartitionPath = new Path(new Path(metaClient.getBasePath()), DEFAULT_PARTITION_PATHS[0]);
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); fileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/data1_1_000.parquet"));
fileSlice.addLogFile( fileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
fileSlice.addLogFile( fileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet")); noLogFileSlice.setDataFile(new TestHoodieDataFile(fullPartitionPath.toString() + "/noLog_1_000.parquet"));
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
noDataFileSlice.addLogFile( noDataFileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))); new HoodieLogFile(new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice); List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
List<Pair<String, FileSlice>> input = List<Pair<String, FileSlice>> input =
fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList()); fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f)).collect(Collectors.toList());
@@ -195,8 +221,9 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
*/ */
private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input, HoodieCompactionPlan plan) { private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input, HoodieCompactionPlan plan) {
Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size()); Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size());
IntStream.range(0, input.size()).boxed().forEach(idx -> testFileSliceCompactionOpEquality(input.get(idx).getValue(), IntStream.range(0, input.size()).boxed().forEach(idx ->
plan.getOperations().get(idx), input.get(idx).getKey())); testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx),
input.get(idx).getKey(), plan.getVersion()));
} }
/** /**
@@ -207,16 +234,20 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
* @param expPartitionPath Partition path * @param expPartitionPath Partition path
*/ */
private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op,
String expPartitionPath) { String expPartitionPath, int version) {
Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath()); Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath());
Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime()); Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime());
Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId()); Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId());
if (slice.getDataFile().isPresent()) { if (slice.getDataFile().isPresent()) {
Assert.assertEquals("Same data-file", slice.getDataFile().get().getPath(), op.getDataFilePath()); HoodieDataFile df = slice.getDataFile().get();
Assert.assertEquals("Same data-file",
version == COMPACTION_METADATA_VERSION_1 ? df.getPath() : df.getFileName(), op.getDataFilePath());
} }
List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList()); List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList());
IntStream.range(0, paths.size()).boxed().forEach(idx -> { IntStream.range(0, paths.size()).boxed().forEach(idx -> {
Assert.assertEquals("Log File Index " + idx, paths.get(idx), op.getDeltaFilePaths().get(idx)); Assert.assertEquals("Log File Index " + idx,
version == COMPACTION_METADATA_VERSION_1 ? paths.get(idx) : new Path(paths.get(idx)).getName(),
op.getDeltaFilePaths().get(idx));
}); });
Assert.assertEquals("Metrics set", metrics, op.getMetrics()); Assert.assertEquals("Metrics set", metrics, op.getMetrics());
} }