1
0

[HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant. (#3210)

* [HUDI-2119] Ensure the rolled-back instance was previously synced to the Metadata Table when syncing a Rollback Instant.

If the rolled-back instant was synced to the Metadata Table, a corresponding deltacommit with the same timestamp should have been created on the Metadata Table timeline. To ensure we can always perfomr this check, the Metadata Table instants should not be archived until their corresponding instants are present in the dataset timeline. But ensuring this requires a large number of instants to be kept on the metadata table.

In this change, the metadata table will keep atleast the number of instants that the main dataset is keeping. If the instant being rolled back was before the metadata table timeline, the code will throw an exception and the metadata table will have to be re-bootstrapped. This should be a very rare occurance and should occur only when the dataset is being repaired by rolling back multiple commits or restoring to an much older time.

* Fixed checkstyle

* Improvements from review comments.

Fixed  checkstyle
Replaced explicit null check with Option.ofNullable
Removed redundant function getSynedInstantTime

* Renamed getSyncedInstantTime and getSyncedInstantTimeForReader.

Sync is confusing so renamed to getUpdateTime() and getReaderTime().

* Removed getReaderTime which is only for testing as the same method can be accessed during testing differently without making it part of the public interface.

* Fix compilation error

* Reverting changes to HoodieMetadataFileSystemView

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Prashant Wason
2021-08-13 21:23:34 -07:00
committed by GitHub
parent 642b1b671d
commit 8eed440694
13 changed files with 295 additions and 129 deletions

View File

@@ -29,8 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -134,23 +132,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
});
}
/**
* Return the timestamp of the latest instant synced.
*
* To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest
* delta-commit.
*/
@Override
public Option<String> getLatestSyncedInstantTime() {
if (!enabled) {
return Option.empty();
}
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
return timeline.getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::getTimestamp);
}
/**
* Tag each record with the location.
*

View File

@@ -18,6 +18,24 @@
package org.apache.hudi.client.functional;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -50,9 +68,11 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -61,9 +81,6 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -76,22 +93,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
@@ -515,6 +516,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Commit with metadata disabled
@@ -530,6 +532,144 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
client.syncTableMetadata();
validateMetadata(client);
}
// If an unsynced commit is automatically rolled back during next commit, the rollback commit gets a timestamp
// greater than than the new commit which is started. Ensure that in this case the rollback is not processed
// as the earlier failed commit would not have been committed.
//
// Dataset: C1 C2 C3.inflight[failed] C4 R5[rolls back C3]
// Metadata: C1.delta C2.delta
//
// When R5 completes, C3.xxx will be deleted. When C4 completes, C4 and R5 will be committed to Metadata Table in
// that order. R5 should be neglected as C3 was never committed to metadata table.
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) {
// Metadata disabled and no auto-commit
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// Not committed so left in inflight state
client.syncTableMetadata();
assertTrue(metadata(client).isInSync());
validateMetadata(client);
}
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) {
// Metadata enabled
// The previous commit will be rolled back automatically
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
validateMetadata(client);
}
// In this scenario an async operations is started and completes around the same time of the failed commit.
// Rest of the reasoning is same as above test.
// C4.clean was an asynchronous clean started along with C3. The clean completed but C3 commit failed.
//
// Dataset: C1 C2 C3.inflight[failed] C4.clean C5 R6[rolls back C3]
// Metadata: C1.delta C2.delta
//
// When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table
// in that order. R6 should be neglected as C3 was never committed to metadata table.
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) {
// Metadata disabled and no auto-commit
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
// Not committed so left in inflight state
client.clean();
client.syncTableMetadata();
assertTrue(metadata(client).isInSync());
validateMetadata(client);
}
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) {
// Metadata enabled
// The previous commit will be rolled back automatically
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertTrue(metadata(client).isInSync());
validateMetadata(client);
}
}
/**
* Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
* timeline.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testManualRollbacks(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Setting to archive more aggressively on the Metadata Table than the Dataset
final int maxDeltaCommitsBeforeCompaction = 4;
final int minArchiveCommitsMetadata = 2;
final int minArchiveCommitsDataset = 4;
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
.build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
// Initialize table with metadata
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// Perform multiple commits
for (int i = 1; i < 10; ++i) {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
if (i == 1) {
records = dataGen.generateInserts(newCommitTime, 5);
} else {
records = dataGen.generateUpdates(newCommitTime, 2);
}
client.startCommitWithTime(newCommitTime);
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
}
// We can only rollback those commits whose deltacommit have not been archived yet.
int numRollbacks = 0;
boolean exceptionRaised = false;
List<HoodieInstant> allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants()
.collect(Collectors.toList());
for (HoodieInstant instantToRollback : allInstants) {
try {
client.rollback(instantToRollback.getTimestamp());
client.syncTableMetadata();
++numRollbacks;
} catch (HoodieMetadataException e) {
exceptionRaised = true;
break;
}
}
assertTrue(exceptionRaised, "Rollback of archived instants should fail");
// Since each rollback also creates a deltacommit, we can only support rolling back of half of the original
// instants present before rollback started.
assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2,
"Rollbacks of non archived instants should work");
}
}
/**
@@ -657,12 +797,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// Table should sync only before the inflightActionTimestamp
HoodieBackedTableMetadataWriter writer =
(HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp);
assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp);
// Reader should sync to all the completed instants
HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime);
// Remove the inflight instance holding back table sync
fs.delete(inflightCleanPath, false);
@@ -670,12 +810,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
writer =
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
// Reader should sync to all the completed instants
metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime);
}
// Enable metadata table and ensure it is synced
@@ -693,7 +833,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
/**
* Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
* Instants on Metadata Table should be archived as per config but we always keep atlest the number of instants
* as on the dataset. Metadata Table should be automatically compacted as per config.
*/
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
@@ -701,12 +842,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
final int maxDeltaCommitsBeforeCompaction = 4;
final int minArchiveLimit = 4;
final int maxArchiveLimit = 6;
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(6, 8).retainCommits(1)
.archiveCommitsWith(minArchiveLimit - 2, maxArchiveLimit - 2).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
// don't archive the data timeline at all.
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveLimit, maxArchiveLimit)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build())
.build();
@@ -736,6 +878,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// ensure archiving has happened
long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants();
long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants();
assertTrue(numDeltaCommits >= minArchiveLimit);
assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline.");
}
@@ -1030,14 +1173,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
// Block sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(fsBlockSizes);
List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(metadataBlockSizes);
assertEquals(fsBlockSizes, metadataBlockSizes);
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
@@ -1054,6 +1189,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
}
// Block sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
List<Long> fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(fsBlockSizes);
List<Long> metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList());
Collections.sort(metadataBlockSizes);
assertEquals(fsBlockSizes, metadataBlockSizes);
assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");