[HUDI-3435] Do not throw exception when instant to rollback does not exist in metadata table active timeline (#4821)
This commit is contained in:
@@ -469,7 +469,22 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
|
||||
throw new HoodieException("Error limiting instant archival based on metadata table", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If this is a metadata table, do not archive the commits that live in data set
|
||||
// active timeline. This is required by metadata table,
|
||||
// see HoodieTableMetadataUtil#processRollbackMetadata for details.
|
||||
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
|
||||
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
|
||||
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
|
||||
.setConf(metaClient.getHadoopConf())
|
||||
.build();
|
||||
Option<String> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
|
||||
if (earliestActiveDatasetCommit.isPresent()) {
|
||||
instants = instants.filter(instant ->
|
||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
|
||||
}
|
||||
}
|
||||
|
||||
return instants.flatMap(hoodieInstant ->
|
||||
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
|
||||
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
|
||||
|
||||
@@ -294,24 +294,29 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
|
||||
AtomicInteger commitTime = new AtomicInteger(1);
|
||||
// trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered.
|
||||
int i = 1;
|
||||
for (; i <= 2; i++) {
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
|
||||
}
|
||||
// expected num commits = 1 (bootstrap) + 2 (writes)
|
||||
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
|
||||
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);
|
||||
assertEquals(3, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
|
||||
|
||||
// trigger a async table service, archival should not kick in, even though conditions are met.
|
||||
// trigger an async table service, archival should not kick in, even though conditions are met.
|
||||
doCluster(testTable, "000000" + commitTime.getAndIncrement());
|
||||
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
|
||||
assertEquals(4, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
|
||||
|
||||
// trigger a regular write operation. archival should kick in.
|
||||
// start the timeline server for MARKERS cleaning up
|
||||
getHoodieWriteClient(writeConfig);
|
||||
// trigger a regular write operation. data set timeline archival should kick in.
|
||||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
|
||||
archiveDataTable(writeConfig, HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build());
|
||||
|
||||
// trigger a regular write operation. metadata timeline archival should kick in.
|
||||
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
|
||||
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
|
||||
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);
|
||||
assertEquals(4, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -942,7 +947,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(exceptionRaised, "Rollback of archived instants should fail");
|
||||
assertFalse(exceptionRaised, "Metadata table should not archive instants that are in dataset active timeline");
|
||||
// Since each rollback also creates a deltacommit, we can only support rolling back of half of the original
|
||||
// instants present before rollback started.
|
||||
assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2,
|
||||
|
||||
@@ -1054,7 +1054,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath))
|
||||
.setLoadActiveTimelineOnLoad(true).build();
|
||||
|
||||
for (int i = 1; i <= 16; i++) {
|
||||
for (int i = 1; i <= 17; i++) {
|
||||
testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT,
|
||||
i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
|
||||
// archival
|
||||
@@ -1075,6 +1075,30 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
IntStream.range(1, i + 1).forEach(j ->
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
|
||||
} else if (i == 8) {
|
||||
// i == 8
|
||||
// The instant "00000000000000" was archived since it's less than
|
||||
// the earliest instant on the dataset active timeline,
|
||||
// the dataset active timeline has instants of range [00000001 ~ 00000008]
|
||||
// because when it does the archiving, no compaction instant on the
|
||||
// metadata active timeline exists yet.
|
||||
assertEquals(9, metadataTableInstants.size());
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001")));
|
||||
IntStream.range(1, i + 1).forEach(j ->
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))));
|
||||
} else if (i <= 11) {
|
||||
// In the metadata table timeline, the first delta commit is "00000007"
|
||||
// because it equals with the earliest commit on the dataset timeline, after archival,
|
||||
// delta commits "00000008" till "00000011" are added later on without archival or compaction
|
||||
assertEquals(i - 5, metadataTableInstants.size());
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001")));
|
||||
IntStream.range(7, i + 1).forEach(j ->
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
"000000" + String.format("%02d", j)))));
|
||||
} else if (i <= 14) {
|
||||
// In the metadata table timeline, the first delta commit is "00000007001"
|
||||
// from metadata table compaction, after archival, delta commits "00000008"
|
||||
@@ -1095,14 +1119,27 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
"000000" + String.format("%02d", j)))));
|
||||
} else {
|
||||
} else if (i == 16) {
|
||||
// i == 16
|
||||
// Only commit "00000015001" and delta commit "00000016" are in the active timeline
|
||||
assertEquals(2, metadataTableInstants.size());
|
||||
// dataset timeline has commits "00000015" and "00000016",
|
||||
// the metadata timeline has commits [00000008, 00000016] and "00000015001"
|
||||
assertEquals(10, metadataTableInstants.size());
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001")));
|
||||
IntStream.range(8, 17).forEach(j ->
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
"000000" + String.format("%02d", j)))));
|
||||
} else {
|
||||
// i == 17
|
||||
// Only commits [00000015, 00000017] and "00000015001" are on the metadata timeline
|
||||
assertEquals(4, metadataTableInstants.size());
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000016")));
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001")));
|
||||
IntStream.range(15, 18).forEach(j ->
|
||||
assertTrue(metadataTableInstants.contains(
|
||||
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
"000000" + String.format("%02d", j)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
|
||||
@@ -36,6 +37,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
|
||||
import static org.apache.hudi.common.util.ValidationUtils.checkState;
|
||||
|
||||
/**
|
||||
* Interface that supports querying various pieces of metadata about a hudi table.
|
||||
@@ -72,6 +74,17 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
|
||||
return metadataTableBasePath.substring(0, metadataTableBasePath.lastIndexOf(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH) - 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the base path of the dataset.
|
||||
*
|
||||
* @param metadataTableBasePath The base path of the metadata table
|
||||
*/
|
||||
static String getDatasetBasePath(String metadataTableBasePath) {
|
||||
int endPos = metadataTableBasePath.lastIndexOf(Path.SEPARATOR + HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH);
|
||||
checkState(endPos != -1, metadataTableBasePath + " should be base path of the metadata table");
|
||||
return metadataTableBasePath.substring(0, endPos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code True} if the given path contains a metadata table.
|
||||
*
|
||||
|
||||
@@ -531,14 +531,35 @@ public class HoodieTableMetadataUtil {
|
||||
}
|
||||
|
||||
// Case 2: The instant-to-rollback was never committed to Metadata Table. This can happen if the instant-to-rollback
|
||||
// was a failed commit (never completed) as only completed instants are synced to Metadata Table.
|
||||
// But the required Metadata Table instants should not have been archived
|
||||
// was a failed commit (never completed).
|
||||
//
|
||||
// There are two cases for failed commit that we need to take care of:
|
||||
// 1) The commit was synced to metadata table successfully but the dataset meta file switches state failed
|
||||
// (from INFLIGHT to COMPLETED), the committed files should be rolled back thus the rollback metadata
|
||||
// can not be skipped, usually a failover should be triggered and the metadata active timeline expects
|
||||
// to contain the commit, we could check whether the commit was synced to metadata table
|
||||
// through HoodieActiveTimeline#containsInstant.
|
||||
//
|
||||
// 2) The commit synced to metadata table failed or was never synced to metadata table,
|
||||
// in this case, the rollback metadata should be skipped.
|
||||
//
|
||||
// And in which case,
|
||||
// metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())
|
||||
// returns true ?
|
||||
// It is most probably because of compaction rollback, we schedule a compaction plan early in the timeline (say t1)
|
||||
// then after a long time schedule and execute the plan then try to rollback it.
|
||||
//
|
||||
// scheduled execution rollback compaction actions
|
||||
// ----- t1 ----- t3 ----- t4 ----- dataset timeline
|
||||
//
|
||||
// ---------- t2 (archive) ----------- metadata timeline
|
||||
//
|
||||
// when at time t4, we commit the compaction rollback,the above check returns true.
|
||||
HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback);
|
||||
if (metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
|
||||
throw new HoodieMetadataException(String.format("The instant %s required to sync rollback of %s has been archived",
|
||||
syncedInstant, instantToRollback));
|
||||
}
|
||||
|
||||
shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant);
|
||||
if (!hasNonZeroRollbackLogFiles && shouldSkip) {
|
||||
LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, since this instant was never committed to Metadata Table",
|
||||
|
||||
Reference in New Issue
Block a user