cleaner should now use commit timeline and not include deltacomits
This commit is contained in:
committed by
vinoth chandar
parent
68723764ed
commit
994d42d307
@@ -584,11 +584,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
public boolean savepoint(String user, String comment) {
|
public boolean savepoint(String user, String comment) {
|
||||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||||
if (table.getCompletedCommitTimeline().empty()) {
|
if (table.getCompletedCommitsTimeline().empty()) {
|
||||||
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
||||||
}
|
}
|
||||||
|
|
||||||
String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp();
|
String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
|
||||||
logger.info("Savepointing latest commit " + latestCommit);
|
logger.info("Savepointing latest commit " + latestCommit);
|
||||||
return savepoint(latestCommit, user, comment);
|
return savepoint(latestCommit, user, comment);
|
||||||
}
|
}
|
||||||
@@ -615,7 +615,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
|
|
||||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
|
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
|
||||||
commitTime);
|
commitTime);
|
||||||
if (!table.getCompletedCommitTimeline().containsInstant(commitInstant)) {
|
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
|
||||||
throw new HoodieSavepointException(
|
throw new HoodieSavepointException(
|
||||||
"Could not savepoint non-existing commit " + commitInstant);
|
"Could not savepoint non-existing commit " + commitInstant);
|
||||||
}
|
}
|
||||||
@@ -628,7 +628,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
|
table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
|
||||||
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
|
lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
|
||||||
} else {
|
} else {
|
||||||
lastCommitRetained = table.getCompletedCommitTimeline().firstInstant().get().getTimestamp();
|
lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cannot allow savepoint time on a commit that could have been cleaned
|
// Cannot allow savepoint time on a commit that could have been cleaned
|
||||||
@@ -792,7 +792,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
table.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
|
table.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
|
||||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||||
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
|
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
|
||||||
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
|
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||||
|
|
||||||
// Check if any of the commits is a savepoint - do not allow rollback on those commits
|
// Check if any of the commits is a savepoint - do not allow rollback on those commits
|
||||||
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
|
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ public class HoodieCommitArchiveLog {
|
|||||||
|
|
||||||
//TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
|
//TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
|
||||||
// with logic above to avoid Stream.concats
|
// with logic above to avoid Stream.concats
|
||||||
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
|
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||||
Optional<HoodieInstant> oldestPendingCompactionInstant =
|
Optional<HoodieInstant> oldestPendingCompactionInstant =
|
||||||
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
|
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
|
||||||
|
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
|||||||
this.config = config;
|
this.config = config;
|
||||||
this.fs = hoodieTable.getMetaClient().getFs();
|
this.fs = hoodieTable.getMetaClient().getFs();
|
||||||
this.hoodieTable = hoodieTable;
|
this.hoodieTable = hoodieTable;
|
||||||
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
|
this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
|
||||||
this.schema = createHoodieWriteSchema(config);
|
this.schema = createHoodieWriteSchema(config);
|
||||||
this.timer = new HoodieTimer().startTimer();
|
this.timer = new HoodieTimer().startTimer();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -773,7 +773,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
// smallFiles only for partitionPath
|
// smallFiles only for partitionPath
|
||||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||||
|
|
||||||
HoodieTimeline commitTimeline = getCompletedCommitTimeline();
|
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
|
||||||
|
|
||||||
if (!commitTimeline.empty()) { // if we have some commits
|
if (!commitTimeline.empty()) { // if we have some commits
|
||||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||||
|
|||||||
@@ -327,7 +327,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||||
|
|
||||||
// Init here since this class (and member variables) might not have been initialized
|
// Init here since this class (and member variables) might not have been initialized
|
||||||
HoodieTimeline commitTimeline = getCompletedCommitTimeline();
|
HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
|
||||||
|
|
||||||
// Find out all eligible small file slices
|
// Find out all eligible small file slices
|
||||||
if (!commitTimeline.empty()) {
|
if (!commitTimeline.empty()) {
|
||||||
|
|||||||
@@ -111,14 +111,14 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
* Get the view of the file system for this table
|
* Get the view of the file system for this table
|
||||||
*/
|
*/
|
||||||
public TableFileSystemView getFileSystemView() {
|
public TableFileSystemView getFileSystemView() {
|
||||||
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
|
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the read optimized view of the file system for this table
|
* Get the read optimized view of the file system for this table
|
||||||
*/
|
*/
|
||||||
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
|
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
|
||||||
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
|
return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -136,11 +136,18 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
return new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline());
|
return new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get only the completed (no-inflights) commit + deltacommit timeline
|
||||||
|
*/
|
||||||
|
public HoodieTimeline getCompletedCommitsTimeline() {
|
||||||
|
return metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get only the completed (no-inflights) commit timeline
|
* Get only the completed (no-inflights) commit timeline
|
||||||
*/
|
*/
|
||||||
public HoodieTimeline getCompletedCommitTimeline() {
|
public HoodieTimeline getCompletedCommitTimeline() {
|
||||||
return metaClient.getCommitsTimeline().filterCompletedInstants();
|
return metaClient.getCommitTimeline().filterCompletedInstants();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -493,7 +493,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
|
|||||||
private List<HoodieDataFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
|
private List<HoodieDataFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
|
||||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
|
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
|
||||||
HoodieTableFileSystemView
|
HoodieTableFileSystemView
|
||||||
view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitTimeline(), allFiles);
|
view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
|
||||||
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList());
|
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList());
|
||||||
return dataFilesToRead;
|
return dataFilesToRead;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -126,8 +126,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||||
|
|
||||||
assertFalse(table.getCompletedCommitTimeline().empty());
|
assertFalse(table.getCompletedCommitsTimeline().empty());
|
||||||
String commitTime = table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp();
|
String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
|
||||||
assertFalse(table.getCompletedCleanTimeline().empty());
|
assertFalse(table.getCompletedCleanTimeline().empty());
|
||||||
assertEquals("The clean instant should be the same as the commit instant", commitTime,
|
assertEquals("The clean instant should be the same as the commit instant", commitTime,
|
||||||
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
|
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
|
||||||
@@ -380,7 +380,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
|
|
||||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
|
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
|
||||||
HoodieTimeline activeTimeline = table1.getCompletedCommitTimeline();
|
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
||||||
Optional<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
|
Optional<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
|
||||||
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
||||||
if (earliestRetainedCommit.isPresent()) {
|
if (earliestRetainedCommit.isPresent()) {
|
||||||
|
|||||||
@@ -176,7 +176,7 @@ public class TestMergeOnReadTable {
|
|||||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||||
dataFilesToRead.findAny().isPresent());
|
dataFilesToRead.findAny().isPresent());
|
||||||
@@ -210,7 +210,7 @@ public class TestMergeOnReadTable {
|
|||||||
client.compact(compactionCommitTime);
|
client.compact(compactionCommitTime);
|
||||||
|
|
||||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
@@ -283,7 +283,7 @@ public class TestMergeOnReadTable {
|
|||||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||||
dataFilesToRead.findAny().isPresent());
|
dataFilesToRead.findAny().isPresent());
|
||||||
@@ -320,7 +320,7 @@ public class TestMergeOnReadTable {
|
|||||||
assertFalse(commit.isPresent());
|
assertFalse(commit.isPresent());
|
||||||
|
|
||||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
@@ -380,7 +380,7 @@ public class TestMergeOnReadTable {
|
|||||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||||
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
|
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
|
||||||
hoodieTable.getCompletedCommitTimeline(), allFiles);
|
hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
|
|
||||||
final String absentCommit = newCommitTime;
|
final String absentCommit = newCommitTime;
|
||||||
assertFalse(roView.getLatestDataFiles().filter(file -> {
|
assertFalse(roView.getLatestDataFiles().filter(file -> {
|
||||||
@@ -430,7 +430,7 @@ public class TestMergeOnReadTable {
|
|||||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||||
dataFilesToRead.findAny().isPresent());
|
dataFilesToRead.findAny().isPresent());
|
||||||
@@ -504,7 +504,7 @@ public class TestMergeOnReadTable {
|
|||||||
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||||
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
// check that the number of records read is still correct after rollback operation
|
// check that the number of records read is still correct after rollback operation
|
||||||
@@ -599,7 +599,7 @@ public class TestMergeOnReadTable {
|
|||||||
Map<String, Long> parquetFileIdToSize = dataFilesToRead.collect(
|
Map<String, Long> parquetFileIdToSize = dataFilesToRead.collect(
|
||||||
Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
|
Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
|
||||||
|
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFilesToRead = roView.getLatestDataFiles();
|
dataFilesToRead = roView.getLatestDataFiles();
|
||||||
List<HoodieDataFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
|
List<HoodieDataFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
|
||||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||||
|
|||||||
Reference in New Issue
Block a user