[HUDI-52] Enabling savepoint and restore for MOR table (#4507)
* Enabling restore for MOR table * Fixing savepoint for compaction commits in MOR
This commit is contained in:
committed by
GitHub
parent
b6891d253f
commit
2954027b92
@@ -78,11 +78,9 @@ public class SavepointsCommand implements CommandMarker {
|
||||
throws Exception {
|
||||
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
|
||||
|
||||
if (!timeline.containsInstant(commitInstant)) {
|
||||
return "Commit " + commitTime + " not found in Commits " + timeline;
|
||||
if (!activeTimeline.getCommitsTimeline().filterCompletedInstants().containsInstant(commitTime)) {
|
||||
return "Commit " + commitTime + " not found in Commits " + activeTimeline;
|
||||
}
|
||||
|
||||
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||
@@ -112,10 +110,10 @@ public class SavepointsCommand implements CommandMarker {
|
||||
throw new HoodieException("There are no completed instants to run rollback");
|
||||
}
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
|
||||
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
|
||||
List<HoodieInstant> instants = timeline.getInstants().filter(instant -> instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());
|
||||
|
||||
if (!timeline.containsInstant(commitInstant)) {
|
||||
if (instants.isEmpty()) {
|
||||
return "Commit " + instantTime + " not found in Commits " + timeline;
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
@@ -65,13 +64,9 @@ public class SavepointActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
|
||||
|
||||
@Override
|
||||
public HoodieSavepointMetadata execute() {
|
||||
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
|
||||
}
|
||||
Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
|
||||
if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
|
||||
throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
|
||||
if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
|
||||
throw new HoodieSavepointException("Could not savepoint non-existing commit " + instantTime);
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
@@ -464,6 +464,100 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
|
||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
|
||||
// Timeline-server-based markers are not used for multi-rollback tests
|
||||
.withMarkersType(MarkerType.DIRECT.name())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
|
||||
HoodieWriteConfig cfg = cfgBuilder.build();
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
|
||||
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
|
||||
|
||||
try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
|
||||
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
List<HoodieRecord> records = insertAndGetRecords("001", client, dataGen, 200);
|
||||
List<HoodieRecord> updates1 = updateAndGetRecords("002", client, dataGen, records);
|
||||
List<HoodieRecord> updates2 = updateAndGetRecords("003", client, dataGen, records);
|
||||
List<HoodieRecord> updates3 = updateAndGetRecords("004", client, dataGen, records);
|
||||
validateRecords(cfg, metaClient, updates3);
|
||||
|
||||
if (!restoreAfterCompaction) {
|
||||
// restore to 002 and validate records.
|
||||
client.restoreToInstant("002");
|
||||
validateRecords(cfg, metaClient, updates1);
|
||||
} else {
|
||||
// trigger compaction and then trigger couple of upserts followed by restore.
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
String compactionInstantTime = "005";
|
||||
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||
JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
|
||||
client.commitCompaction(compactionInstantTime, ws, Option.empty());
|
||||
|
||||
validateRecords(cfg, metaClient, updates3);
|
||||
List<HoodieRecord> updates4 = updateAndGetRecords("006", client, dataGen, records);
|
||||
List<HoodieRecord> updates5 = updateAndGetRecords("007", client, dataGen, records);
|
||||
validateRecords(cfg, metaClient, updates5);
|
||||
|
||||
// restore to 003 and validate records.
|
||||
client.restoreToInstant("003");
|
||||
validateRecords(cfg, metaClient, updates2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<HoodieRecord> insertAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, int count) {
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, count);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
|
||||
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
return records;
|
||||
}
|
||||
|
||||
private List<HoodieRecord> updateAndGetRecords(String newCommitTime, SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, List<HoodieRecord> records) throws IOException {
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
List<HoodieRecord> updates = dataGen.generateUpdates(newCommitTime, records);
|
||||
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(jsc().parallelize(updates, 1), newCommitTime);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
return updates;
|
||||
}
|
||||
|
||||
private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient metaClient, List<HoodieRecord> expectedRecords) throws IOException {
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
|
||||
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
|
||||
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
|
||||
basePath());
|
||||
assertRecords(expectedRecords, recordsRead);
|
||||
}
|
||||
|
||||
private void assertRecords(List<HoodieRecord> inputRecords, List<GenericRecord> recordsRead) {
|
||||
assertEquals(recordsRead.size(), inputRecords.size());
|
||||
Map<String, GenericRecord> expectedRecords = new HashMap<>();
|
||||
inputRecords.forEach(entry -> {
|
||||
try {
|
||||
expectedRecords.put(entry.getRecordKey(), ((GenericRecord) entry.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get()));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
Map<String, GenericRecord> actualRecords = new HashMap<>();
|
||||
recordsRead.forEach(entry -> actualRecords.put(String.valueOf(entry.get("_row_key")), entry));
|
||||
for (Map.Entry<String, GenericRecord> entry : expectedRecords.entrySet()) {
|
||||
assertEquals(String.valueOf(entry.getValue().get("driver")), String.valueOf(actualRecords.get(entry.getKey()).get("driver")));
|
||||
}
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) {
|
||||
return getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).build();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user