[HUDI-2766] Cluster update strategy should not be fenced by write config (#4093)
Fix pending clustering rollback test
This commit is contained in:
@@ -115,35 +115,31 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
|
||||
}
|
||||
|
||||
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
|
||||
if (config.isClusteringEnabled()) {
|
||||
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
||||
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
|
||||
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
|
||||
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
|
||||
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
|
||||
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
|
||||
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
|
||||
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
|
||||
return recordsAndPendingClusteringFileGroups.getLeft();
|
||||
}
|
||||
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
|
||||
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
|
||||
if (config.isRollbackPendingClustering()) {
|
||||
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
|
||||
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
|
||||
.map(Map.Entry::getValue)
|
||||
.collect(Collectors.toSet());
|
||||
pendingClusteringInstantsToRollback.forEach(instant -> {
|
||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
|
||||
table.rollback(context, commitTime, instant, true, true);
|
||||
});
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
}
|
||||
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
|
||||
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
|
||||
UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
|
||||
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
|
||||
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
|
||||
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>) updateStrategy.handleUpdate(inputRecordsRDD);
|
||||
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
|
||||
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
|
||||
return recordsAndPendingClusteringFileGroups.getLeft();
|
||||
} else {
|
||||
return inputRecordsRDD;
|
||||
}
|
||||
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
|
||||
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
|
||||
if (config.isRollbackPendingClustering()) {
|
||||
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
|
||||
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
|
||||
.map(Map.Entry::getValue)
|
||||
.collect(Collectors.toSet());
|
||||
pendingClusteringInstantsToRollback.forEach(instant -> {
|
||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
|
||||
table.rollback(context, commitTime, instant, true, true);
|
||||
});
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
}
|
||||
return recordsAndPendingClusteringFileGroups.getLeft();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -18,12 +18,6 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
|
||||
import org.apache.hudi.common.config.LockConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
@@ -42,23 +36,31 @@ import org.apache.hudi.config.HoodieLockConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieWriteConflictException;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
|
||||
@@ -105,7 +107,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
try {
|
||||
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
} catch (Exception e1) {
|
||||
Assertions.assertTrue(e1 instanceof HoodieWriteConflictException);
|
||||
assertTrue(e1 instanceof HoodieWriteConflictException);
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
});
|
||||
@@ -116,13 +118,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
try {
|
||||
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
} catch (Exception e2) {
|
||||
Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
|
||||
assertTrue(e2 instanceof HoodieWriteConflictException);
|
||||
throw new RuntimeException(e2);
|
||||
}
|
||||
});
|
||||
future1.get();
|
||||
future2.get();
|
||||
Assertions.fail("Should not reach here, this means concurrent writes were handled incorrectly");
|
||||
fail("Should not reach here, this means concurrent writes were handled incorrectly");
|
||||
} catch (Exception e) {
|
||||
// Expected to fail due to overlapping commits
|
||||
}
|
||||
@@ -206,7 +208,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
|
||||
// Disabling embedded timeline server, it doesn't work with multiwriter
|
||||
HoodieWriteConfig cfg = getConfigBuilder()
|
||||
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false)
|
||||
.withInlineCompaction(false).withAsyncClean(true)
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||
@@ -214,12 +216,12 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
.withEmbeddedTimelineServerEnabled(false)
|
||||
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
|
||||
FileSystemViewStorageType.MEMORY).build())
|
||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||
.build()).withAutoCommit(false).withProperties(properties).build();
|
||||
.build()).withAutoCommit(false).withProperties(properties);
|
||||
Set<String> validInstants = new HashSet<>();
|
||||
// Create the first commit with inserts
|
||||
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
|
||||
createCommitWithInserts(cfg, client, "000", "001", 200);
|
||||
validInstants.add("001");
|
||||
@@ -229,7 +231,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
validInstants.add("002");
|
||||
validInstants.add("003");
|
||||
ExecutorService executors = Executors.newFixedThreadPool(2);
|
||||
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
|
||||
// write config with clustering enabled
|
||||
HoodieWriteConfig cfg2 = writeConfigBuilder
|
||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
|
||||
.build();
|
||||
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
|
||||
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
|
||||
// Create upserts, schedule cleaning, schedule compaction in parallel
|
||||
Future future1 = executors.submit(() -> {
|
||||
@@ -237,14 +243,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
int numRecords = 100;
|
||||
String commitTimeBetweenPrevAndNew = "002";
|
||||
try {
|
||||
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||
Assertions.fail("Conflicts not handled correctly");
|
||||
fail("Conflicts not handled correctly");
|
||||
}
|
||||
validInstants.add("004");
|
||||
} catch (Exception e1) {
|
||||
if (tableType == HoodieTableType.MERGE_ON_READ) {
|
||||
Assertions.assertTrue(e1 instanceof HoodieWriteConflictException);
|
||||
assertTrue(e1 instanceof HoodieWriteConflictException);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -272,7 +278,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
String newCommitTime = "007";
|
||||
int numRecords = 100;
|
||||
try {
|
||||
createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords);
|
||||
createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords);
|
||||
validInstants.add("007");
|
||||
} catch (Exception e1) {
|
||||
throw new RuntimeException(e1);
|
||||
@@ -300,10 +306,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
future1.get();
|
||||
future2.get();
|
||||
future3.get();
|
||||
Set<String> completedInstants = metaClient.getActiveTimeline().getCommitsTimeline()
|
||||
validInstants.addAll(
|
||||
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
|
||||
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));
|
||||
Set<String> completedInstants = metaClient.reloadActiveTimeline().getCommitsTimeline()
|
||||
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toSet());
|
||||
Assertions.assertTrue(validInstants.containsAll(completedInstants));
|
||||
assertTrue(validInstants.containsAll(completedInstants));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -314,13 +323,16 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
}
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
||||
HoodieWriteConfig cfg = getConfigBuilder()
|
||||
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||
.withAutoClean(false).build())
|
||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
|
||||
.build()).withAutoCommit(false).withProperties(properties).build();
|
||||
.build()).withAutoCommit(false).withProperties(properties);
|
||||
HoodieWriteConfig cfg = writeConfigBuilder.build();
|
||||
HoodieWriteConfig cfg2 = writeConfigBuilder
|
||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
|
||||
.build();
|
||||
// Create the first commit
|
||||
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
|
||||
// Start another inflight commit
|
||||
@@ -333,8 +345,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
numRecords, 200, 2);
|
||||
// Start and finish another commit while the previous writer for commit 003 is running
|
||||
newCommitTime = "004";
|
||||
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
|
||||
JavaRDD<WriteStatus> result2 = updateBatch(cfg, client2, newCommitTime, "001",
|
||||
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
|
||||
JavaRDD<WriteStatus> result2 = updateBatch(cfg2, client2, newCommitTime, "001",
|
||||
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
|
||||
numRecords, 200, 2);
|
||||
client2.commit(newCommitTime, result2);
|
||||
@@ -342,11 +354,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
|
||||
// schedule clustering
|
||||
Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
|
||||
client3.cluster(clusterInstant.get(), true);
|
||||
assertFalse(clusterInstant.isPresent());
|
||||
// Attempt to commit the inflight commit 003
|
||||
try {
|
||||
client1.commit("003", result1);
|
||||
Assertions.fail("Should have thrown a concurrent conflict exception");
|
||||
fail("Should have thrown a concurrent conflict exception");
|
||||
} catch (Exception e) {
|
||||
// Expected
|
||||
}
|
||||
@@ -376,5 +388,4 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
numRecords, 200, 2);
|
||||
client.commit(newCommitTime, result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1418,8 +1418,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
dataGen = new HoodieTestDataGenerator();
|
||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
allRecords.addAll(dataGen.generateInserts(commitTime, 200));
|
||||
writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
|
||||
|
||||
assertThrows(HoodieUpsertException.class, () -> writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields));
|
||||
// verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering)
|
||||
client.rollback(pendingClusteringInstant.getTimestamp());
|
||||
metaClient.reloadActiveTimeline();
|
||||
|
||||
Reference in New Issue
Block a user