[HUDI-2641] Avoid deleting all inflight commits heartbeats while rolling back failed writes (#3956)
This commit is contained in:
@@ -852,18 +852,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
|
||||
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
|
||||
rollbackFailedBootstrap();
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
|
||||
break;
|
||||
} else {
|
||||
rollback(instant, skipLocking);
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
|
||||
}
|
||||
}
|
||||
// Delete any heartbeat files for already rolled back commits
|
||||
try {
|
||||
HeartbeatUtils.cleanExpiredHeartbeats(this.heartbeatClient.getAllExistingHeartbeatInstants(),
|
||||
createMetaClient(true), basePath);
|
||||
} catch (IOException io) {
|
||||
LOG.error("Unable to delete heartbeat files", io);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
|
||||
|
||||
@@ -21,7 +21,6 @@ package org.apache.hudi.client.heartbeat;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -30,9 +29,6 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Helper class to delete heartbeat for completed or failed instants with expired heartbeats.
|
||||
@@ -55,6 +51,8 @@ public class HeartbeatUtils {
|
||||
deleted = fs.delete(new Path(heartbeatFolderPath + Path.SEPARATOR + instantTime), false);
|
||||
if (!deleted) {
|
||||
LOG.error("Failed to delete heartbeat for instant " + instantTime);
|
||||
} else {
|
||||
LOG.info("Deleted the heartbeat for instant " + instantTime);
|
||||
}
|
||||
} catch (IOException io) {
|
||||
LOG.error("Unable to delete heartbeat for instant " + instantTime, io);
|
||||
@@ -63,20 +61,19 @@ public class HeartbeatUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the heartbeat files for instants with expired heartbeats without any active instant.
|
||||
* @param allExistingHeartbeatInstants
|
||||
* @param metaClient
|
||||
* @param basePath
|
||||
* Deletes the heartbeat file for the specified instant.
|
||||
* @param fs Hadoop FileSystem instance
|
||||
* @param basePath Hoodie table base path
|
||||
* @param instantTime Commit instant time
|
||||
* @param config HoodieWriteConfig instance
|
||||
* @return Boolean indicating whether heartbeat file was deleted or not
|
||||
*/
|
||||
public static void cleanExpiredHeartbeats(List<String> allExistingHeartbeatInstants,
|
||||
HoodieTableMetaClient metaClient, String basePath) {
|
||||
Set<String> nonExpiredHeartbeatInstants = metaClient.getActiveTimeline()
|
||||
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||
allExistingHeartbeatInstants.stream().forEach(instant -> {
|
||||
if (!nonExpiredHeartbeatInstants.contains(instant)) {
|
||||
deleteHeartbeatFile(metaClient.getFs(), basePath, instant);
|
||||
}
|
||||
});
|
||||
public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, String instantTime, HoodieWriteConfig config) {
|
||||
if (config.getFailedWritesCleanPolicy().isLazy()) {
|
||||
return deleteHeartbeatFile(fs, basePath, instantTime);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -18,7 +18,9 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -136,6 +138,62 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
|
||||
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
|
||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||
setUpMORTestTable();
|
||||
}
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
|
||||
|
||||
HoodieWriteConfig cfg = getConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withInlineCompaction(false)
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(2)
|
||||
.build())
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||
.build())
|
||||
.withAutoCommit(false)
|
||||
.withProperties(properties)
|
||||
.build();
|
||||
|
||||
// Create the first commit
|
||||
SparkRDDWriteClient<?> client = getHoodieWriteClient(cfg);
|
||||
createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01");
|
||||
|
||||
int numConcurrentWriters = 5;
|
||||
ExecutorService executors = Executors.newFixedThreadPool(numConcurrentWriters);
|
||||
|
||||
List<Future<?>> futures = new ArrayList<>(numConcurrentWriters);
|
||||
for (int loop = 0; loop < numConcurrentWriters; loop++) {
|
||||
String newCommitTime = "00" + (loop + 2);
|
||||
String partition = "2016/03/0" + (loop + 2);
|
||||
futures.add(executors.submit(() -> {
|
||||
try {
|
||||
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg);
|
||||
createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
futures.forEach(f -> {
|
||||
try {
|
||||
f.get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
|
||||
// create inserts X 1
|
||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||
@@ -294,6 +352,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client,
|
||||
String prevCommitTime, String newCommitTime, int numRecords,
|
||||
String partition) throws Exception {
|
||||
JavaRDD<WriteStatus> result = insertBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::insert,
|
||||
false, false, numRecords, numRecords, 1, Option.of(partition));
|
||||
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
|
||||
}
|
||||
|
||||
private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
|
||||
String prevCommitTime, String newCommitTime, int numRecords) throws Exception {
|
||||
// Finish first base commmit
|
||||
|
||||
@@ -303,7 +303,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
||||
|
||||
// Insert with original schema is allowed now
|
||||
insertBatch(hoodieWriteConfig, client, "009", "008", numRecords, SparkRDDWriteClient::insert,
|
||||
false, false, 0, 0, 0);
|
||||
false, false, 0, 0, 0, Option.empty());
|
||||
checkLatestDeltaCommit("009");
|
||||
checkReadRecords("000", 3 * numRecords);
|
||||
}
|
||||
@@ -438,7 +438,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
||||
|
||||
// Insert with original schema is allowed now
|
||||
insertBatch(hoodieWriteConfig, client, "007", "003", numRecords, SparkRDDWriteClient::insert,
|
||||
false, true, numRecords, 2 * numRecords, 1);
|
||||
false, true, numRecords, 2 * numRecords, 1, Option.empty());
|
||||
checkReadRecords("000", 2 * numRecords);
|
||||
|
||||
// Update with original schema is allowed now
|
||||
|
||||
@@ -244,6 +244,18 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
};
|
||||
}
|
||||
|
||||
private Function3<List<HoodieRecord>, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls(
|
||||
final HoodieWriteConfig writeConfig, final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction) {
|
||||
return (commit, numRecords, partition) -> {
|
||||
final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
|
||||
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords, partition);
|
||||
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
|
||||
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
|
||||
JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
|
||||
return taggedRecords.collect();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
|
||||
* to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
|
||||
@@ -285,6 +297,15 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
public Function3<List<HoodieRecord>, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI,
|
||||
HoodieWriteConfig writeConfig, Function3<List<HoodieRecord>, String, Integer, String> wrapped) {
|
||||
if (isPreppedAPI) {
|
||||
return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
|
||||
} else {
|
||||
return wrapped;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate wrapper for delete key generation function for testing Prepped APIs.
|
||||
*
|
||||
@@ -355,12 +376,22 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
public JavaRDD<WriteStatus> insertBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime,
|
||||
String initCommitTime, int numRecordsInThisCommit,
|
||||
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, Option<String> partition) throws Exception {
|
||||
|
||||
return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
|
||||
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false);
|
||||
if (partition.isPresent()) {
|
||||
final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction =
|
||||
generateWrapRecordsForPartitionFn(isPreppedAPI, writeConfig, dataGen::generateInsertsForPartition);
|
||||
|
||||
return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
|
||||
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false,
|
||||
partition.get());
|
||||
} else {
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
|
||||
|
||||
return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
|
||||
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false);
|
||||
}
|
||||
}
|
||||
|
||||
public JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime,
|
||||
@@ -453,6 +484,16 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true);
|
||||
}
|
||||
|
||||
public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
|
||||
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
|
||||
Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction,
|
||||
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
|
||||
boolean doCommit, String partition) throws Exception {
|
||||
return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, recordGenFunction,
|
||||
writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true, partition);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
|
||||
*
|
||||
@@ -478,10 +519,35 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit,
|
||||
boolean filterForCommitTimeWithAssert) throws Exception {
|
||||
|
||||
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
|
||||
return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
|
||||
numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
|
||||
expTotalCommits, doCommit, filterForCommitTimeWithAssert);
|
||||
}
|
||||
|
||||
public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
|
||||
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
|
||||
Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction,
|
||||
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit,
|
||||
boolean filterForCommitTimeWithAssert,
|
||||
String partition) throws Exception {
|
||||
|
||||
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit, partition);
|
||||
return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
|
||||
numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
|
||||
expTotalCommits, doCommit, filterForCommitTimeWithAssert);
|
||||
}
|
||||
|
||||
private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
|
||||
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
|
||||
int numRecordsInThisCommit, List<HoodieRecord> records,
|
||||
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords,
|
||||
int expTotalCommits, boolean doCommit, boolean filterForCommitTimeWithAssert) throws IOException {
|
||||
// Write 1 (only inserts)
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
|
||||
|
||||
Reference in New Issue
Block a user