[HUDI-690] Filter out inflight compaction in exporter (#1667)
This commit is contained in:
@@ -152,8 +152,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active
|
* Get all instants (commits, delta commits, compaction, clean, savepoint, rollback) that result in actions,
|
||||||
* timeline.
|
* in the active timeline.
|
||||||
*/
|
*/
|
||||||
public HoodieTimeline getAllCommitsTimeline() {
|
public HoodieTimeline getAllCommitsTimeline() {
|
||||||
return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
|
return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
|
||||||
|
|||||||
@@ -74,10 +74,10 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
|
final SerializableConfiguration serConf = new SerializableConfiguration(jsc.hadoopConfiguration());
|
||||||
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
|
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
|
||||||
final BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata,
|
final BaseFileOnlyView fsView = new HoodieTableFileSystemView(tableMetadata,
|
||||||
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
|
||||||
// Get the latest commit
|
// Get the latest commit
|
||||||
Option<HoodieInstant> latestCommit =
|
Option<HoodieInstant> latestCommit =
|
||||||
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant();
|
||||||
if (!latestCommit.isPresent()) {
|
if (!latestCommit.isPresent()) {
|
||||||
LOG.warn("No commits present. Nothing to snapshot");
|
LOG.warn("No commits present. Nothing to snapshot");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -146,7 +146,8 @@ public class HoodieSnapshotExporter {
|
|||||||
|
|
||||||
private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
|
private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
|
||||||
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
|
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
|
||||||
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
|
Option<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline()
|
||||||
|
.filterCompletedInstants().lastInstant();
|
||||||
return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
|
return latestCommit.isPresent() ? Option.of(latestCommit.get().getTimestamp()) : Option.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -251,7 +252,7 @@ public class HoodieSnapshotExporter {
|
|||||||
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
|
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
|
||||||
HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
|
HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
|
||||||
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
|
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
|
||||||
.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
|
.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
|
|||||||
Reference in New Issue
Block a user