1
0

[HUDI-1502] MOR rollback and restore support for metadata sync (#2421)

- Adds field to RollbackMetadata that capture the logs written for rollback blocks
- Adds field to RollbackMetadata that capture new logs files written by unsynced deltacommits

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Sivabalan Narayanan
2021-01-11 16:23:13 -05:00
committed by GitHub
parent de42adc230
commit e3d3677b7e
15 changed files with 270 additions and 118 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -116,12 +118,22 @@ public class ListingBasedRollbackHelper implements Serializable {
.withDeletedFileResults(filesToDeletedStatus).build());
}
case APPEND_ROLLBACK_BLOCK: {
String fileId = rollbackRequest.getFileId().get();
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
// collect all log files that is supposed to be deleted with this rollback
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
Writer writer = null;
try {
writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(rollbackRequest.getFileId().get())
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
.withFileId(fileId)
.overBaseCommit(latestBaseInstant)
.withFs(metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
@@ -149,9 +161,11 @@ public class ListingBasedRollbackHelper implements Serializable {
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
1L
);
return new Tuple2<>(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
}
default:
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
@@ -159,7 +173,6 @@ public class ListingBasedRollbackHelper implements Serializable {
});
}
/**
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
*/

View File

@@ -22,7 +22,10 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
@@ -32,10 +35,14 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -75,4 +82,11 @@ public class SparkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> ext
throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
}
}
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
}
}

View File

@@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);
@TempDir
@@ -261,13 +262,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
//public void testRollbackOperations(HoodieTableType tableType) throws Exception {
@Test
public void testRollbackOperations() throws Exception {
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testRollbackOperations(HoodieTableType tableType) throws Exception {
//FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
init(HoodieTableType.COPY_ON_WRITE);
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -371,13 +370,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
/**
* Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op
* occurs to metadata.
* @throws Exception
* Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata.
* Once explicit sync is called, metadata should match.
*/
@Test
public void testRollbackUnsyncedCommit() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception {
init(tableType);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
@@ -389,7 +388,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
// Commit with metadata disabled
@@ -401,6 +399,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) {
assertFalse(metadata(client).isInSync());
client.syncTableMetadata();
validateMetadata(client);
}
}
@@ -528,8 +528,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
/**
* Instants on Metadata Table should be archived as per config.
* Metadata Table should be automatically compacted as per config.
* Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config.
*/
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
@@ -752,8 +751,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
/**
* Validate the metadata tables contents to ensure it matches what is on the file system.
*
* @throws IOException
*/
private void validateMetadata(SparkRDDWriteClient client) throws IOException {
HoodieWriteConfig config = client.getConfig();
@@ -807,7 +804,19 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) {
LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray()));
LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray()));
for (String fileName : fsFileNames) {
if (!metadataFilenames.contains(fileName)) {
LOG.error(partition + "FsFilename " + fileName + " not found in Meta data");
}
}
for (String fileName : metadataFilenames) {
if (!fsFileNames.contains(fileName)) {
LOG.error(partition + "Metadata file " + fileName + " not found in original FS");
}
}
}
assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match");
assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match");

View File

@@ -18,20 +18,33 @@
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
@@ -40,13 +53,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with listing metadata enable={0}";
public static Stream<Arguments> configParams() {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
@BeforeEach
public void setUp() throws Exception {
initPath();
initSparkContexts();
initFileSystem();
initMetaClient();
initDFS();
initMetaClient(tableType);
initTestDataGenerator();
}
@AfterEach
@@ -55,7 +76,7 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
}
@Test
public void testCopyOnWriteRollback() throws Exception {
public void testCopyOnWriteRollbackWithTestTable() throws Exception {
// given: wrote some base files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
@@ -85,43 +106,78 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
}
@Test
public void testMergeOnReadRollback() throws Exception {
// given: wrote some base + log files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f2 = testTable.addRequestedDeltaCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
String f1 = testTable.addDeltaCommit("001")
.withLogFile("partA", f2)
.getFileIdsWithBaseFilesInPartitions("partB").get("partB");
String f3 = "f3";
String f4 = "f4";
testTable.forDeltaCommit("001")
.withMarkerFile("partB", f1, IOType.CREATE)
.withMarkerFile("partA", f3, IOType.CREATE)
.withMarkerFile("partA", f2, IOType.APPEND)
.withMarkerFile("partB", f4, IOType.APPEND);
@Tag("functional")
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testCopyOnWriteRollback(boolean useFileListingMetadata) throws Exception {
HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
.withPath(basePath).build();
// when
List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "001"));
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
// rollback 2nd commit and ensure stats reflect the info.
List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
// then: ensure files are deleted, rollback block is appended (even if append does not exist)
assertEquals(2, stats.size());
// will have the log file
FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
assertEquals(1, partBFiles.length);
assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
assertTrue(partBFiles[0].getLen() > 0);
FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
assertEquals(3, partAFiles.length);
assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
// only partB/f1_001 will be deleted
assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
// partA/f3_001 is non existent
assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
assertEquals(3, stats.size());
for (HoodieRollbackStat stat : stats) {
assertEquals(1, stat.getSuccessDeleteFiles().size());
assertEquals(0, stat.getFailedDeleteFiles().size());
assertEquals(0, stat.getCommandBlocksCount().size());
assertEquals(0, stat.getWrittenLogFileSizeMap().size());
}
}
}
@Tag("functional")
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testMergeOnReadRollback(boolean useFileListingMetadata) throws Exception {
// init MERGE_ON_READ_TABLE
tearDown();
tableType = HoodieTableType.MERGE_ON_READ;
setUp();
HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
.withPath(basePath).build();
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig)) {
// rollback 2nd commit and ensure stats reflect the info.
List<HoodieRollbackStat> stats = testRun(useFileListingMetadata, writeConfig, writeClient);
assertEquals(3, stats.size());
for (HoodieRollbackStat stat : stats) {
assertEquals(0, stat.getSuccessDeleteFiles().size());
assertEquals(0, stat.getFailedDeleteFiles().size());
assertEquals(1, stat.getCommandBlocksCount().size());
stat.getCommandBlocksCount().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
assertEquals(1, stat.getWrittenLogFileSizeMap().size());
stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
}
}
}
private List<HoodieRollbackStat> testRun(boolean useFileListingMetadata, HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient) {
String newCommitTime = "001";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<WriteStatus> writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
writeClient.commit(newCommitTime, writeStatuses);
// Updates
newCommitTime = "002";
writeClient.startCommitWithTime(newCommitTime);
records = dataGen.generateUniqueUpdates(newCommitTime, 50);
writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
writeStatuses.collect();
// rollback 2nd commit and ensure stats reflect the info.
return new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
.execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
}
}