[HUDI-1098] Adding OptimisticConsistencyGuard to be used during FinalizeWrite (#1912)
This commit is contained in:
committed by
GitHub
parent
ff53e8f0b6
commit
858eda85d7
@@ -32,7 +32,9 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
@@ -505,7 +507,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
||||
final FileSystem fileSystem = metaClient.getRawFs();
|
||||
List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
|
||||
try {
|
||||
getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
|
||||
getConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()).waitTill(partitionPath, fileList, visibility);
|
||||
} catch (IOException | TimeoutException ioe) {
|
||||
LOG.error("Got exception while waiting for files to show up", ioe);
|
||||
return false;
|
||||
@@ -513,8 +515,18 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
||||
return true;
|
||||
}
|
||||
|
||||
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
|
||||
return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig());
|
||||
/**
|
||||
* Instantiate {@link ConsistencyGuard} based on configs.
|
||||
* <p>
|
||||
* Default consistencyGuard class is {@link OptimisticConsistencyGuard}.
|
||||
*/
|
||||
public static ConsistencyGuard getConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) throws IOException {
|
||||
try {
|
||||
return consistencyGuardConfig.shouldEnableOptimisticConsistencyGuard()
|
||||
? new OptimisticConsistencyGuard(fs, consistencyGuardConfig) : new FailSafeConsistencyGuard(fs, consistencyGuardConfig);
|
||||
} catch (Throwable e) {
|
||||
throw new IOException("Could not load ConsistencyGuard ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public SparkTaskContextSupplier getSparkTaskContextSupplier() {
|
||||
|
||||
@@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||
import org.apache.hudi.io.IOType;
|
||||
@@ -63,6 +64,7 @@ import org.apache.spark.sql.Row;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
@@ -91,6 +93,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -1081,54 +1084,97 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
/**
|
||||
* Tests behavior of committing only when consistency is verified.
|
||||
*/
|
||||
@Test
|
||||
public void testConsistencyCheckDuringFinalize() throws Exception {
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
String instantTime = "000";
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
|
||||
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
|
||||
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||
Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime);
|
||||
Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
|
||||
|
||||
// Delete orphan marker and commit should succeed
|
||||
metaClient.getFs().delete(result.getKey(), false);
|
||||
assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
// Marker directory must be removed
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
if (!enableOptimisticConsistencyGuard) {
|
||||
assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
// Marker directory must be removed
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
} else {
|
||||
// with optimistic, first client.commit should have succeeded.
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
// Marker directory must be removed
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
}
|
||||
}
|
||||
|
||||
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers) throws Exception {
|
||||
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception {
|
||||
String instantTime = "000";
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).build();
|
||||
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||
testConsistencyCheck(metaClient, instantTime);
|
||||
|
||||
// Rollback of this commit should succeed
|
||||
client.rollback(instantTime);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit rollback, commit file should not be present");
|
||||
// Marker directory must be removed after rollback
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackAfterConsistencyCheckFailureUsingFileList() throws Exception {
|
||||
testRollbackAfterConsistencyCheckFailureUsingFileList(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackAfterConsistencyCheckFailureUsingMarkers() throws Exception {
|
||||
testRollbackAfterConsistencyCheckFailureUsingFileList(true);
|
||||
}
|
||||
|
||||
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime)
|
||||
throws Exception {
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
|
||||
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
|
||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
|
||||
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).build())
|
||||
.build();
|
||||
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :
|
||||
getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
|
||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
|
||||
.withConsistencyCheckEnabled(true)
|
||||
.withOptimisticConsistencyGuardSleepTimeMs(1).build()).build();
|
||||
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||
testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard);
|
||||
|
||||
if (!enableOptimisticConsistencyGuard) {
|
||||
// Rollback of this commit should succeed with FailSafeCG
|
||||
client.rollback(instantTime);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit rollback, commit file should not be present");
|
||||
// Marker directory must be removed after rollback
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
} else {
|
||||
// if optimistic CG is enabled, commit should have succeeded.
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"With optimistic CG, first commit should succeed. commit file should be present");
|
||||
// Marker directory must be removed after rollback
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
if (rollbackUsingMarkers) {
|
||||
// rollback of a completed commit should fail if marked based rollback is used.
|
||||
try {
|
||||
client.rollback(instantTime);
|
||||
fail("Rollback of completed commit should throw exception");
|
||||
} catch (HoodieRollbackException e) {
|
||||
// ignore
|
||||
}
|
||||
} else {
|
||||
// rollback of a completed commit should succeed if using list based rollback
|
||||
client.rollback(instantTime);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
"After explicit rollback, commit file should not be present");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception {
|
||||
testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard) throws Exception {
|
||||
testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard);
|
||||
}
|
||||
|
||||
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)
|
||||
throws Exception {
|
||||
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false)
|
||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
|
||||
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build())
|
||||
.build()) : (getConfigBuilder().withAutoCommit(false)
|
||||
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
|
||||
.withOptimisticConsistencyGuardSleepTimeMs(1).build())
|
||||
.build());
|
||||
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||
|
||||
client.startCommitWithTime(instantTime);
|
||||
@@ -1149,10 +1195,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
IOType.MERGE);
|
||||
LOG.info("Created a dummy marker path=" + markerFilePath);
|
||||
|
||||
Exception e = assertThrows(HoodieCommitException.class, () -> {
|
||||
if (!enableOptimisticConsistencyGuard) {
|
||||
Exception e = assertThrows(HoodieCommitException.class, () -> {
|
||||
client.commit(instantTime, result);
|
||||
}, "Commit should fail due to consistency check");
|
||||
assertTrue(e.getCause() instanceof HoodieIOException);
|
||||
} else {
|
||||
// with optimistic CG, commit should succeed
|
||||
client.commit(instantTime, result);
|
||||
}, "Commit should fail due to consistency check");
|
||||
assertTrue(e.getCause() instanceof HoodieIOException);
|
||||
}
|
||||
return Pair.of(markerFilePath, result);
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
@@ -28,14 +29,29 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
/**
|
||||
* Unit tests {@link ConsistencyGuard}s.
|
||||
*/
|
||||
public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
// multiple parameters, uses Collection<Object[]>
|
||||
public static List<Arguments> consistencyGuardType() {
|
||||
return Arrays.asList(
|
||||
Arguments.of(FailSafeConsistencyGuard.class.getName()),
|
||||
Arguments.of(OptimisticConsistencyGuard.class.getName())
|
||||
);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
initPath();
|
||||
@@ -47,13 +63,16 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
cleanupResources();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckPassingAppearAndDisAppear() throws Exception {
|
||||
@ParameterizedTest
|
||||
@MethodSource("consistencyGuardType")
|
||||
public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
|
||||
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig(1, 1000, 1000));
|
||||
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
|
||||
ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
|
||||
? new FailSafeConsistencyGuard(fs, config) : new OptimisticConsistencyGuard(fs, config);
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
@@ -68,7 +87,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppear() throws Exception {
|
||||
public void testCheckFailingAppearFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
@@ -78,7 +97,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppears() throws Exception {
|
||||
public void testCheckFailingAppearTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppearsFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
@@ -87,7 +114,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappear() throws Exception {
|
||||
public void testCheckFailingAppearsTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
@@ -97,7 +131,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappears() throws Exception {
|
||||
public void testCheckFailingDisappearTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearsFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
@@ -106,6 +148,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearsTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
}
|
||||
|
||||
private ConsistencyGuardConfig getConsistencyGuardConfig() {
|
||||
return getConsistencyGuardConfig(3, 10, 10);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user