1
0

Table rollback for inflight compactions MUST not delete instant files at any time to avoid race conditions

This commit is contained in:
Balaji Varadarajan
2019-02-05 16:54:31 -08:00
committed by vinoth chandar
parent defcf6a0b9
commit 8adaca3454
6 changed files with 97 additions and 18 deletions

View File

@@ -839,7 +839,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Remove interleaving pending compactions before rolling back commits // Remove interleaving pending compactions before rolling back commits
pendingCompactionToRollback.forEach(this::deletePendingCompaction); pendingCompactionToRollback.forEach(this::deletePendingCompaction);
List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback); List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback, true);
// cleanup index entries // cleanup index entries
commitsToRollback.forEach(s -> { commitsToRollback.forEach(s -> {
@@ -1206,8 +1206,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* @param inflightInstant Inflight Compaction Instant * @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table * @param table Hoodie Table
*/ */
private void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { @VisibleForTesting
table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() })); void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() }), false);
// Revert instant state file // Revert instant state file
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
} }

View File

@@ -347,7 +347,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
@Override @Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits) public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
throws IOException { throws IOException {
String actionType = metaClient.getCommitActionType(); String actionType = metaClient.getCommitActionType();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
@@ -375,13 +375,29 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
// clean temporary data files // clean temporary data files
cleanTemporaryDataFiles(jsc); cleanTemporaryDataFiles(jsc);
// Remove the rolled back inflight commits // Delete Inflight instants if enabled
commits.stream().map(s -> new HoodieInstant(true, actionType, s)) deleteInflightInstants(deleteInstants, activeTimeline,
.forEach(activeTimeline::deleteInflight); commits.stream().map(s -> new HoodieInstant(true, actionType, s)).collect(Collectors.toList()));
logger.info("Deleted inflight commits " + commits);
return stats; return stats;
} }
/**
* Delete Inflight instants if enabled
* @param deleteInstants Enable Deletion of Inflight instants
* @param activeTimeline Hoodie active timeline
* @param instantsToBeDeleted Instants to be deleted
*/
protected static void deleteInflightInstants(boolean deleteInstants, HoodieActiveTimeline activeTimeline,
List<HoodieInstant> instantsToBeDeleted) {
// Remove the rolled back inflight commits
if (deleteInstants) {
instantsToBeDeleted.forEach(activeTimeline::deleteInflight);
logger.info("Deleted inflight commits " + instantsToBeDeleted);
} else {
logger.warn("Rollback finished without deleting inflight instant files. Instants=" + instantsToBeDeleted);
}
}
/** /**
* Finalize the written data files * Finalize the written data files
* *

View File

@@ -165,7 +165,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits) public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
throws IOException { throws IOException {
//At the moment, MOR table type does not support nested rollbacks //At the moment, MOR table type does not support nested rollbacks
@@ -274,11 +274,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
return hoodieRollbackStats; return hoodieRollbackStats;
}).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect(); }).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect();
// Delete Inflight instants if enabled
deleteInflightInstants(deleteInstants, this.getActiveTimeline(),
commitsAndCompactions.entrySet().stream().map( commitsAndCompactions.entrySet().stream().map(
entry -> new HoodieInstant(true, entry.getValue().getAction(), entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp()))
entry.getValue().getTimestamp())).forEach(this.getActiveTimeline()::deleteInflight); .collect(Collectors.toList()));
logger
.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats; return allRollbackStats;
} }

View File

@@ -251,9 +251,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
/** /**
* Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1)
* Atomically unpublish this commit (2) clean indexing data (3) clean new generated parquet files * Atomically unpublish this commit (2) clean indexing data (3) clean new generated parquet files
* / log blocks (4) Finally, delete .<action>.commit or .<action>.inflight file * / log blocks (4) Finally, delete .<action>.commit or .<action>.inflight file if deleteInstants = true
*/ */
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits) public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
throws IOException; throws IOException;
/** /**

View File

@@ -33,6 +33,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.CompactionUtils;
@@ -51,6 +52,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.Test; import org.junit.Test;
@@ -78,6 +80,64 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
super.tearDown(); super.tearDown();
} }
@Test
public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
assertTrue("Pending Compaction instant has expected state",
pendingCompactionInstant.getState().equals(State.REQUESTED));
moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg);
// Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
hoodieTable.rollback(jsc, Arrays.asList(compactionInstantTime), false);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get();
assertEquals("compaction", pendingCompactionInstant.getAction());
assertEquals(State.REQUESTED, pendingCompactionInstant.getState());
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp());
// We indirectly test for the race condition where a inflight instant was first deleted then created new. Every
// time this happens, the pending compaction instant file in Hoodie Meta path becomes an empty file (Note: Hoodie
// reads compaction plan from aux path which is untouched). TO test for regression, we simply get file status
// and look at the file size
FileStatus fstatus =
metaClient.getFs().getFileStatus(new Path(metaClient.getMetaPath(), pendingCompactionInstant.getFileName()));
assertTrue(fstatus.getLen() > 0);
}
private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) {
HoodieInstant instant = new HoodieInstant(state, action, timestamp);
return new Path(metaClient.getMetaPath(), instant.getFileName());
}
@Test @Test
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction // Rollback inflight ingestion when there is pending compaction

View File

@@ -634,14 +634,14 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTable table = HoodieTable.getHoodieTable( HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
jsc); jsc);
table.rollback(jsc, Collections.emptyList()); table.rollback(jsc, Collections.emptyList(), true);
assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles()); assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles());
config = HoodieWriteConfig.newBuilder().withPath(basePath).withUseTempFolderCopyOnWriteForCreate(true) config = HoodieWriteConfig.newBuilder().withPath(basePath).withUseTempFolderCopyOnWriteForCreate(true)
.withUseTempFolderCopyOnWriteForMerge(false).build(); .withUseTempFolderCopyOnWriteForMerge(false).build();
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config, jsc); config, jsc);
table.rollback(jsc, Collections.emptyList()); table.rollback(jsc, Collections.emptyList(), true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles()); assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
} }