Enable multi/nested rollbacks for MOR table type
This commit is contained in:
@@ -34,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieRollingStat;
|
||||
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieTableType;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
@@ -503,7 +504,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
updateMetadataAndRollingStats(actionType, metadata, stats);
|
||||
|
||||
|
||||
// Finalize write
|
||||
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
|
||||
try {
|
||||
@@ -587,6 +587,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
if (table.getCompletedCommitsTimeline().empty()) {
|
||||
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
||||
}
|
||||
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
|
||||
}
|
||||
|
||||
String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
|
||||
logger.info("Savepointing latest commit " + latestCommit);
|
||||
@@ -611,6 +614,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
public boolean savepoint(String commitTime, String user, String comment) {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
|
||||
}
|
||||
Optional<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
|
||||
|
||||
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION,
|
||||
@@ -672,6 +678,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
public void deleteSavepoint(String savepointTime) {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
|
||||
}
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
|
||||
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION,
|
||||
@@ -696,30 +705,25 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* Otherwise, async compactor could fail with errors
|
||||
*
|
||||
* @param compactionTime - delete the compaction time
|
||||
* @return
|
||||
*/
|
||||
private void deletePendingCompaction(String compactionTime) {
|
||||
private void deleteRequestedCompaction(String compactionTime) {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
|
||||
HoodieInstant compactionRequestedInstant =
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime);
|
||||
HoodieInstant compactionInflightInstant =
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionTime);
|
||||
boolean isCompactionInstantInRequestedState = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||
.containsInstant(compactionRequestedInstant);
|
||||
boolean isCompactionInstantInInflightState = table.getActiveTimeline().filterPendingCompactionTimeline()
|
||||
.containsInstant(compactionInflightInstant);
|
||||
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
|
||||
if (commitTimeline.empty() && !commitTimeline
|
||||
.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found commits after time :" + compactionTime + ", please rollback greater commits first");
|
||||
}
|
||||
if (isCompactionInstantInRequestedState) {
|
||||
activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
|
||||
} else if (isCompactionInstantInInflightState) {
|
||||
activeTimeline.revertCompactionInflightToRequested(compactionInflightInstant);
|
||||
activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
|
||||
} else {
|
||||
logger.error("No compaction present " + compactionTime);
|
||||
throw new IllegalArgumentException("No compaction present " + compactionTime);
|
||||
throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
|
||||
}
|
||||
logger.info("Compaction " + compactionTime + " deleted");
|
||||
}
|
||||
@@ -776,8 +780,55 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
return true;
|
||||
}
|
||||
|
||||
private void rollback(List<String> commits) {
|
||||
if (commits.isEmpty()) {
|
||||
/**
|
||||
* NOTE : This action requires all writers (ingest and compact) to a dataset to be stopped before proceeding.
|
||||
* Revert the (inflight/committed) record changes for all commits after the provided @param.
|
||||
* Three steps: (1) Atomically unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log
|
||||
* files and/or append rollback to existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight
|
||||
* or .compaction.requested file
|
||||
*/
|
||||
public void restoreToCommit(final String commitTime) throws HoodieRollbackException {
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
// Get all the commits on the timeline after the provided commit time
|
||||
List<HoodieInstant> commitsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), commitTime))
|
||||
.collect(Collectors.toList());
|
||||
// reverse the commits to descending order of commit time
|
||||
Collections.reverse(commitsToRollback);
|
||||
commitsToRollback.stream().forEach(instant -> {
|
||||
switch (instant.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION:
|
||||
rollback(instant.getTimestamp());
|
||||
break;
|
||||
case HoodieTimeline.COMPACTION_ACTION:
|
||||
if (instant.isRequested()) {
|
||||
// TODO : Get file status and create a rollback stat and file
|
||||
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
|
||||
// delete these files when it does not see a corresponding instant file under .hoodie
|
||||
deleteRequestedCompaction(instant.getTimestamp());
|
||||
logger.info("Deleted pending scheduled compaction " + instant.getTimestamp());
|
||||
} else {
|
||||
rollback(instant.getTimestamp());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action name " + instant.getAction());
|
||||
}
|
||||
try {
|
||||
// Ensure unique rollback instants for seconds granularity
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new HoodieRollbackException("unable to rollback instant " + instant, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void rollback(List<String> commitsToRollback) {
|
||||
if (commitsToRollback.isEmpty()) {
|
||||
logger.info("List of commits to rollback is empty");
|
||||
return;
|
||||
}
|
||||
@@ -797,7 +848,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// Check if any of the commits is a savepoint - do not allow rollback on those commits
|
||||
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
commits.forEach(s -> {
|
||||
commitsToRollback.forEach(s -> {
|
||||
if (savepoints.contains(s)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Could not rollback a savepointed commit. Delete savepoint first before rolling back"
|
||||
@@ -805,11 +856,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
});
|
||||
|
||||
List<String> pendingCompactionToRollback =
|
||||
commits.stream().filter(pendingCompactions::contains).collect(Collectors.toList());
|
||||
List<String> commitsToRollback =
|
||||
commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
if (commitTimeline.empty() && inflightCommitTimeline.empty()) {
|
||||
// nothing to rollback
|
||||
@@ -836,11 +882,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
+ ", please rollback greater commits first");
|
||||
}
|
||||
|
||||
// Remove interleaving pending compactions before rolling back commits
|
||||
pendingCompactionToRollback.forEach(this::deletePendingCompaction);
|
||||
|
||||
List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback, true);
|
||||
|
||||
logger.info("Deleted inflight commits " + commitsToRollback);
|
||||
|
||||
// cleanup index entries
|
||||
commitsToRollback.forEach(s -> {
|
||||
if (!index.rollbackCommit(s)) {
|
||||
@@ -974,8 +1019,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Schedules a new compaction instant
|
||||
* @param extraMetadata
|
||||
* @return
|
||||
*/
|
||||
public Optional<String> scheduleCompaction(Optional<Map<String, String>> extraMetadata) throws IOException {
|
||||
String instantTime = HoodieActiveTimeline.createNewCommitTime();
|
||||
@@ -986,8 +1029,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Schedules a new compaction instant with passed-in instant time
|
||||
* @param instantTime Compaction Instant Time
|
||||
* @param extraMetadata Extra Metadata to be stored
|
||||
*
|
||||
* @param instantTime Compaction Instant Time
|
||||
* @param extraMetadata Extra Metadata to be stored
|
||||
*/
|
||||
public boolean scheduleCompactionAtInstant(String instantTime, Optional<Map<String, String>> extraMetadata)
|
||||
throws IOException {
|
||||
@@ -1003,8 +1047,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// Committed and pending compaction instants should have strictly lower timestamps
|
||||
List<HoodieInstant> conflictingInstants =
|
||||
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant ->
|
||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime,
|
||||
HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
|
||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime,
|
||||
HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
|
||||
Preconditions.checkArgument(conflictingInstants.isEmpty(),
|
||||
"Following instants have timestamps >= compactionInstant. Instants :"
|
||||
+ conflictingInstants);
|
||||
@@ -1023,9 +1067,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Performs Compaction for the workload stored in instant-time
|
||||
* @param compactionInstantTime Compaction Instant Time
|
||||
* @return
|
||||
* @throws IOException
|
||||
*
|
||||
* @param compactionInstantTime Compaction Instant Time
|
||||
*/
|
||||
public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
|
||||
return compact(compactionInstantTime, config.shouldAutoCommit());
|
||||
@@ -1113,9 +1156,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time
|
||||
* @param compactionInstantTime Compaction Instant Time
|
||||
* @return
|
||||
* @throws IOException
|
||||
*
|
||||
* @param compactionInstantTime Compaction Instant Time
|
||||
*/
|
||||
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
@@ -1145,9 +1187,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
/**
|
||||
* Perform compaction operations as specified in the compaction commit file
|
||||
*
|
||||
* @param compactionInstant Compacton Instant time
|
||||
* @param compactionInstant Compacton Instant time
|
||||
* @param activeTimeline Active Timeline
|
||||
* @param autoCommit Commit after compaction
|
||||
* @param autoCommit Commit after compaction
|
||||
* @return RDD of Write Status
|
||||
*/
|
||||
private JavaRDD<WriteStatus> runCompaction(
|
||||
@@ -1173,11 +1215,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
/**
|
||||
* Commit Compaction and track metrics
|
||||
*
|
||||
* @param compactedStatuses Compaction Write status
|
||||
* @param table Hoodie Table
|
||||
* @param compactedStatuses Compaction Write status
|
||||
* @param table Hoodie Table
|
||||
* @param compactionCommitTime Compaction Commit Time
|
||||
* @param autoCommit Auto Commit
|
||||
* @param extraMetadata Extra Metadata to store
|
||||
* @param autoCommit Auto Commit
|
||||
* @param extraMetadata Extra Metadata to store
|
||||
*/
|
||||
protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table,
|
||||
String compactionCommitTime, boolean autoCommit, Optional<Map<String, String>> extraMetadata) {
|
||||
@@ -1202,13 +1244,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback partial compactions
|
||||
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
|
||||
*
|
||||
* @param inflightInstant Inflight Compaction Instant
|
||||
* @param table Hoodie Table
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
|
||||
table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() }), false);
|
||||
table.rollback(jsc, ImmutableList.copyOf(new String[]{inflightInstant.getTimestamp()}), false);
|
||||
// Revert instant state file
|
||||
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
||||
}
|
||||
@@ -1305,4 +1348,5 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
throw new HoodieCommitException("Unable to save rolling stats");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
package com.uber.hoodie.table;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.uber.hoodie.WriteStatus;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
@@ -203,9 +202,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
|
||||
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
|
||||
try (ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath())
|
||||
.withConf(getHadoopConf()).build()) {
|
||||
.withConf(getHadoopConf()).build()) {
|
||||
wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader),
|
||||
new UpdateHandler(upsertHandle), x -> x);
|
||||
new UpdateHandler(upsertHandle), x -> x);
|
||||
wrapper.execute();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException(e);
|
||||
@@ -313,18 +312,17 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
* Common method used for cleaning out parquet files under a partition path during rollback of a
|
||||
* set of commits
|
||||
*/
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(String partitionPath, List<String> commits)
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
|
||||
PathFilter filter)
|
||||
throws IOException {
|
||||
Map<FileStatus, Boolean> results = Maps.newHashMap();
|
||||
// PathFilter to get all parquet files and log files that need to be deleted
|
||||
PathFilter filter = (path) -> {
|
||||
if (path.toString().contains(".parquet")) {
|
||||
String fileCommitTime = FSUtils.getCommitTime(path.getName());
|
||||
return commits.contains(fileCommitTime);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
deleteCleanedFiles(results, partitionPath, filter);
|
||||
logger.info("Cleaning path " + partitionPath);
|
||||
FileSystem fs = getMetaClient().getFs();
|
||||
FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter);
|
||||
for (FileStatus file : toBeDeleted) {
|
||||
boolean success = fs.delete(file.getPath(), false);
|
||||
results.put(file, success);
|
||||
logger.info("Delete file " + file.getPath() + "\t" + success);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -332,11 +330,18 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
* Common method used for cleaning out parquet files under a partition path during rollback of a
|
||||
* set of commits
|
||||
*/
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath,
|
||||
PathFilter filter)
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, List<String> commits, String
|
||||
partitionPath)
|
||||
throws IOException {
|
||||
logger.info("Cleaning path " + partitionPath);
|
||||
FileSystem fs = getMetaClient().getFs();
|
||||
PathFilter filter = (path) -> {
|
||||
if (path.toString().contains(".parquet")) {
|
||||
String fileCommitTime = FSUtils.getCommitTime(path.getName());
|
||||
return commits.contains(fileCommitTime);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter);
|
||||
for (FileStatus file : toBeDeleted) {
|
||||
boolean success = fs.delete(file.getPath(), false);
|
||||
@@ -367,9 +372,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
config.shouldAssumeDatePartitioning()))
|
||||
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
||||
// Scan all partitions files with this commit time
|
||||
Map<FileStatus, Boolean> results = deleteCleanedFiles(partitionPath, commits);
|
||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||
deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath);
|
||||
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
|
||||
.withDeletedFileResults(results).build();
|
||||
.withDeletedFileResults(filesToDeletedStatus).build();
|
||||
}).collect();
|
||||
|
||||
// clean temporary data files
|
||||
|
||||
@@ -52,7 +52,16 @@ import com.uber.hoodie.io.HoodieAppendHandle;
|
||||
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -168,10 +177,15 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
|
||||
throws IOException {
|
||||
|
||||
//At the moment, MOR table type does not support nested rollbacks
|
||||
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
|
||||
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
|
||||
// (commitToRollback).
|
||||
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
|
||||
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
|
||||
if (commits.size() > 1) {
|
||||
throw new UnsupportedOperationException("Nested Rollbacks are not supported");
|
||||
throw new UnsupportedOperationException("Bulk Nested Rollbacks are not supported");
|
||||
}
|
||||
// Atomically un-publish all non-inflight commits
|
||||
Map<String, HoodieInstant> commitsAndCompactions = this.getActiveTimeline()
|
||||
.getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION,
|
||||
HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants()
|
||||
@@ -188,6 +202,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
config.shouldAssumeDatePartitioning()))
|
||||
.map((Function<String, List<HoodieRollbackStat>>) partitionPath -> commits.stream().map(commit -> {
|
||||
HoodieInstant instant = commitsAndCompactions.get(commit);
|
||||
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
|
||||
HoodieRollbackStat hoodieRollbackStats = null;
|
||||
// Need to put the path filter here since Filter is not serializable
|
||||
// PathFilter to get all parquet files and log files that need to be deleted
|
||||
@@ -203,14 +218,43 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
return false;
|
||||
};
|
||||
|
||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||
|
||||
switch (instant.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
try {
|
||||
// Rollback of a commit should delete the newly created parquet files along with any log
|
||||
// files created with this as baseCommit. This is required to support multi-rollbacks in a MOR table.
|
||||
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
|
||||
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build();
|
||||
break;
|
||||
} catch (IOException io) {
|
||||
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
|
||||
}
|
||||
case HoodieTimeline.COMPACTION_ACTION:
|
||||
try {
|
||||
Map<FileStatus, Boolean> results = super
|
||||
.deleteCleanedFiles(partitionPath, Collections.singletonList(commit));
|
||||
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath).withDeletedFileResults(results).build();
|
||||
// If there is no delta commit present after the current commit (if compaction), no action, else we
|
||||
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
|
||||
// succeeding deltacommit.
|
||||
boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline()
|
||||
.filterCompletedInstants().findInstantsAfter(commit, 1).empty();
|
||||
if (higherDeltaCommits) {
|
||||
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
|
||||
// and has not yet finished. In this scenario we should delete only the newly created parquet files
|
||||
// and not corresponding base commit log files created with this as baseCommit since updates would
|
||||
// have been written to the log files.
|
||||
super.deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath);
|
||||
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build();
|
||||
} else {
|
||||
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
|
||||
// can also delete any log files that were created with this compaction commit as base
|
||||
// commit.
|
||||
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
|
||||
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build();
|
||||
}
|
||||
break;
|
||||
} catch (IOException io) {
|
||||
throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
|
||||
@@ -246,7 +290,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
.get(), HoodieCommitMetadata.class);
|
||||
|
||||
// read commit file and (either append delete blocks or delete file)
|
||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
|
||||
|
||||
// In case all data was inserts and the commit failed, delete the file belonging to that commit
|
||||
@@ -457,7 +500,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
.filter(wStat -> {
|
||||
// Filter out stats without prevCommit since they are all inserts
|
||||
return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null
|
||||
&& !deletedFiles.contains(wStat.getFileId());
|
||||
&& !deletedFiles.contains(wStat.getFileId());
|
||||
}).forEach(wStat -> {
|
||||
HoodieLogFormat.Writer writer = null;
|
||||
String baseCommitTime = wStat.getPrevCommit();
|
||||
|
||||
@@ -33,6 +33,7 @@ import com.uber.hoodie.common.minicluster.HdfsTestService;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieFileGroup;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRollingStat;
|
||||
@@ -564,6 +565,178 @@ public class TestMergeOnReadTable {
|
||||
}).findAny().isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
|
||||
|
||||
HoodieWriteConfig cfg = getConfig(false);
|
||||
final HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
List<String> allCommits = new ArrayList<>();
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
allCommits.add(newCommitTime);
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
|
||||
Optional<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
|
||||
|
||||
Optional<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue("ReadOptimizedTableView should list the parquet files we wrote in the delta commit",
|
||||
dataFilesToRead.findAny().isPresent());
|
||||
|
||||
/**
|
||||
* Write 2 (inserts + updates)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
allCommits.add(newCommitTime);
|
||||
// WriteClient with custom config (disable small file handling)
|
||||
HoodieWriteClient nClient = new HoodieWriteClient(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2)
|
||||
.withAutoCommit(false).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.compactionSmallFileSize(1 * 1024).withInlineCompaction(false)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1 * 1024).build())
|
||||
.forTable("test-trip-table").build());
|
||||
nClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
|
||||
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
|
||||
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
|
||||
|
||||
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||
assertEquals(recordsRead.size(), 200);
|
||||
|
||||
statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
nClient.commit(newCommitTime, writeStatusJavaRDD);
|
||||
copyOfRecords.clear();
|
||||
|
||||
|
||||
// Schedule a compaction
|
||||
/**
|
||||
* Write 3 (inserts + updates)
|
||||
*/
|
||||
newCommitTime = "003";
|
||||
allCommits.add(newCommitTime);
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> newInserts = dataGen.generateInserts(newCommitTime, 100);
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
records.addAll(newInserts);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
statuses = writeStatusJavaRDD.collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
|
||||
String compactionInstantTime = "004";
|
||||
allCommits.add(compactionInstantTime);
|
||||
client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty());
|
||||
|
||||
// Compaction commit
|
||||
/**
|
||||
* Write 4 (updates)
|
||||
*/
|
||||
newCommitTime = "005";
|
||||
allCommits.add(newCommitTime);
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
statuses = writeStatusJavaRDD.collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
|
||||
compactionInstantTime = "006";
|
||||
allCommits.add(compactionInstantTime);
|
||||
client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty());
|
||||
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
|
||||
client.commitCompaction(compactionInstantTime, ws, Optional.empty());
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get()
|
||||
.getTimestamp();
|
||||
|
||||
assertTrue(roView.getLatestDataFiles().filter(file -> {
|
||||
if (compactedCommitTime.equals(file.getCommitTime())) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}).findAny().isPresent());
|
||||
|
||||
/**
|
||||
* Write 5 (updates)
|
||||
*/
|
||||
newCommitTime = "007";
|
||||
allCommits.add(newCommitTime);
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
copyOfRecords = new ArrayList<>(records);
|
||||
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
|
||||
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
|
||||
|
||||
statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||
copyOfRecords.clear();
|
||||
|
||||
// Rollback latest commit first
|
||||
client.restoreToCommit("000");
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
roView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue(!dataFilesToRead.findAny().isPresent());
|
||||
HoodieTableFileSystemView.RealtimeView rtView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
List<HoodieFileGroup> fileGroups = ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors
|
||||
.toList());
|
||||
assertTrue(fileGroups.isEmpty());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUpsertPartitioner() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
|
||||
@@ -98,7 +98,7 @@ public interface HoodieTimeline extends Serializable {
|
||||
HoodieTimeline filterCompletedAndCompactionInstants();
|
||||
|
||||
/**
|
||||
* Filter this timeline to just include inflight and requested compaction instants
|
||||
* Filter this timeline to just include requested and inflight compaction instants
|
||||
* @return
|
||||
*/
|
||||
HoodieTimeline filterPendingCompactionTimeline();
|
||||
|
||||
@@ -262,6 +262,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
Preconditions.checkArgument(inflightInstant.isInflight());
|
||||
HoodieInstant requestedInstant =
|
||||
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
|
||||
// Pass empty data since it is read from the corresponding .aux/.compaction instant file
|
||||
transitionState(inflightInstant, requestedInstant, Optional.empty());
|
||||
return requestedInstant;
|
||||
}
|
||||
@@ -310,7 +311,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||
Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
|
||||
try {
|
||||
// open a new file and write the commit metadata in
|
||||
// Re-create the .inflight file by opening a new file and write the commit metadata in
|
||||
Path inflightCommitFile = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
|
||||
createFileInMetaPath(fromInstant.getFileName(), data);
|
||||
boolean success = metaClient.getFs().rename(inflightCommitFile, commitFilePath);
|
||||
|
||||
Reference in New Issue
Block a user