1
0

[HUDI-2286] Handle the case of failed deltacommit on the metadata table. (#3428)

A failed deltacommit on the metadata table will be automatically rolled back. Assuming the failed commit was "t10", the rollback will happen the next time at "t11". Post rollback, when we try to sync the dataset to the metadata table, we should look for all unsynched instants including t11. Current code ignores t11 since the latest commit timestamp on metadata table is t11 (due to rollback).
This commit is contained in:
Prashant Wason
2021-08-11 07:39:48 -07:00
committed by GitHub
parent c9fa3cffaf
commit aa11989ead
3 changed files with 87 additions and 3 deletions

View File

@@ -35,10 +35,12 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.HoodieTimer;
@@ -857,6 +859,62 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
validateMetadata(unsyncedClient);
}
/**
* Test that failure to perform deltacommit on the metadata table does not lead to missed sync.
*/
@Test
public void testMetdataTableCommitFailure() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
// Write 1
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// Write 2
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
}
// At this time both commits 001 and 002 must be synced to the metadata table
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline();
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
// Delete the 002 deltacommit completed instant to make it inflight
FileCreateUtils.deleteDeltaCommit(metadataTableBasePath, "002");
timeline = metadataMetaClient.reloadActiveTimeline();
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
// In this commit deltacommit "002" will be rolled back and attempted again.
String latestCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
String newCommitTime = "003";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
records = dataGen.generateInserts(latestCommitTime, 20);
client.startCommitWithTime(latestCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect();
assertNoWriteErrors(writeStatuses);
}
timeline = metadataMetaClient.reloadActiveTimeline();
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001")));
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002")));
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime)));
assertTrue(timeline.getRollbackTimeline().countInstants() == 1);
}
/**
* Validate the metadata tables contents to ensure it matches what is on the file system.

View File

@@ -299,9 +299,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return Collections.EMPTY_LIST;
}
// All instants on the data timeline, which are greater than the last instant on metadata timeline
// are candidates for sync.
String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
// All instants on the data timeline, which are greater than the last deltacommit instant on metadata timeline
// are candidates for sync. We only consider delta-commit instants as each actions on dataset leads to a
// deltacommit on the metadata table.
String latestMetadataInstantTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant().get().getTimestamp();
HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
Option<HoodieInstant> earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty()
: candidateTimeline.filterInflightsAndRequested().firstInstant();

View File

@@ -274,6 +274,30 @@ public class FileCreateUtils {
return markerFilePath.toAbsolutePath().toString();
}
private static void removeMetaFile(String basePath, String instantTime, String suffix) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
Path metaFilePath = parentPath.resolve(instantTime + suffix);
if (Files.exists(metaFilePath)) {
Files.delete(metaFilePath);
}
}
public static void deleteCommit(String basePath, String instantTime) throws IOException {
removeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
}
public static void deleteRequestedCommit(String basePath, String instantTime) throws IOException {
removeMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION);
}
public static void deleteInflightCommit(String basePath, String instantTime) throws IOException {
removeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
}
public static void deleteDeltaCommit(String basePath, String instantTime) throws IOException {
removeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
}
public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
if (Files.notExists(parentPath)) {