diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 565e046ea..4ed32b4f8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -32,7 +32,9 @@ import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; +import org.apache.hudi.common.fs.OptimisticConsistencyGuard; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -505,7 +507,7 @@ public abstract class HoodieTable implements Seri final FileSystem fileSystem = metaClient.getRawFs(); List fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList()); try { - getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility); + getConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()).waitTill(partitionPath, fileList, visibility); } catch (IOException | TimeoutException ioe) { LOG.error("Got exception while waiting for files to show up", ioe); return false; @@ -513,8 +515,18 @@ public abstract class HoodieTable implements Seri return true; } - private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) { - return new FailSafeConsistencyGuard(fileSystem, config.getConsistencyGuardConfig()); + /** + * Instantiate {@link ConsistencyGuard} based on configs. + *

+ * Default consistencyGuard class is {@link OptimisticConsistencyGuard}. + */ + public static ConsistencyGuard getConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) throws IOException { + try { + return consistencyGuardConfig.shouldEnableOptimisticConsistencyGuard() + ? new OptimisticConsistencyGuard(fs, consistencyGuardConfig) : new FailSafeConsistencyGuard(fs, consistencyGuardConfig); + } catch (Throwable e) { + throw new IOException("Could not load ConsistencyGuard ", e); + } } public SparkTaskContextSupplier getSparkTaskContextSupplier() { diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 7eea3ae3d..51d8a6a43 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.IOType; @@ -63,6 +64,7 @@ import org.apache.spark.sql.Row; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.FileInputStream; import java.io.IOException; @@ -91,6 +93,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -1081,54 +1084,97 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { /** * Tests behavior of committing only when consistency is verified. */ - @Test - public void testConsistencyCheckDuringFinalize() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); String instantTime = "000"; - HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); - Pair> result = testConsistencyCheck(metaClient, instantTime); + Pair> result = testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard); // Delete orphan marker and commit should succeed metaClient.getFs().delete(result.getKey(), false); - assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed"); - assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime), - "After explicit commit, commit file should be created"); - // Marker directory must be removed - assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); + if (!enableOptimisticConsistencyGuard) { + assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed"); + assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime), + "After explicit commit, commit file should be created"); + // Marker directory must be removed + assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); + } else { + // with optimistic, first client.commit should have succeeded. + assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime), + "After explicit commit, commit file should be created"); + // Marker directory must be removed + assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); + } } - private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers) throws Exception { + private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception { String instantTime = "000"; HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); - HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false).build(); - HoodieWriteClient client = getHoodieWriteClient(cfg); - testConsistencyCheck(metaClient, instantTime); - - // Rollback of this commit should succeed - client.rollback(instantTime); - assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime), - "After explicit rollback, commit file should not be present"); - // Marker directory must be removed after rollback - assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); - } - - @Test - public void testRollbackAfterConsistencyCheckFailureUsingFileList() throws Exception { - testRollbackAfterConsistencyCheckFailureUsingFileList(false); - } - - @Test - public void testRollbackAfterConsistencyCheckFailureUsingMarkers() throws Exception { - testRollbackAfterConsistencyCheckFailureUsingFileList(true); - } - - private Pair> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime) - throws Exception { - HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false) + HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true) - .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).build()) - .build(); + .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() : + getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() + .withConsistencyCheckEnabled(true) + .withOptimisticConsistencyGuardSleepTimeMs(1).build()).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + testConsistencyCheck(metaClient, instantTime, enableOptimisticConsistencyGuard); + + if (!enableOptimisticConsistencyGuard) { + // Rollback of this commit should succeed with FailSafeCG + client.rollback(instantTime); + assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime), + "After explicit rollback, commit file should not be present"); + // Marker directory must be removed after rollback + assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); + } else { + // if optimistic CG is enabled, commit should have succeeded. + assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime), + "With optimistic CG, first commit should succeed. commit file should be present"); + // Marker directory must be removed after rollback + assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime)))); + if (rollbackUsingMarkers) { + // rollback of a completed commit should fail if marked based rollback is used. + try { + client.rollback(instantTime); + fail("Rollback of completed commit should throw exception"); + } catch (HoodieRollbackException e) { + // ignore + } + } else { + // rollback of a completed commit should succeed if using list based rollback + client.rollback(instantTime); + assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime), + "After explicit rollback, commit file should not be present"); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean enableOptimisticConsistencyGuard) throws Exception { + testRollbackAfterConsistencyCheckFailureUsingFileList(false, enableOptimisticConsistencyGuard); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableOptimisticConsistencyGuard) throws Exception { + testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard); + } + + private Pair> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard) + throws Exception { + HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true) + .withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()) + .build()) : (getConfigBuilder().withAutoCommit(false) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true) + .withOptimisticConsistencyGuardSleepTimeMs(1).build()) + .build()); HoodieWriteClient client = getHoodieWriteClient(cfg); client.startCommitWithTime(instantTime); @@ -1149,10 +1195,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { IOType.MERGE); LOG.info("Created a dummy marker path=" + markerFilePath); - Exception e = assertThrows(HoodieCommitException.class, () -> { + if (!enableOptimisticConsistencyGuard) { + Exception e = assertThrows(HoodieCommitException.class, () -> { + client.commit(instantTime, result); + }, "Commit should fail due to consistency check"); + assertTrue(e.getCause() instanceof HoodieIOException); + } else { + // with optimistic CG, commit should succeed client.commit(instantTime, result); - }, "Commit should fail due to consistency check"); - assertTrue(e.getCause() instanceof HoodieIOException); + } return Pair.of(markerFilePath, result); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index 2406d8534..da4224a14 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -21,6 +21,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; +import org.apache.hudi.common.fs.OptimisticConsistencyGuard; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; @@ -28,14 +29,29 @@ import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeoutException; import static org.junit.jupiter.api.Assertions.assertThrows; +/** + * Unit tests {@link ConsistencyGuard}s. + */ public class TestConsistencyGuard extends HoodieClientTestHarness { + // multiple parameters, uses Collection + public static List consistencyGuardType() { + return Arrays.asList( + Arguments.of(FailSafeConsistencyGuard.class.getName()), + Arguments.of(OptimisticConsistencyGuard.class.getName()) + ); + } + @BeforeEach public void setup() { initPath(); @@ -47,13 +63,16 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { cleanupResources(); } - @Test - public void testCheckPassingAppearAndDisAppear() throws Exception { + @ParameterizedTest + @MethodSource("consistencyGuardType") + public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); - ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig(1, 1000, 1000)); + ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); + ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) + ? new FailSafeConsistencyGuard(fs, config) : new OptimisticConsistencyGuard(fs, config); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet")); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays @@ -68,7 +87,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { } @Test - public void testCheckFailingAppear() throws Exception { + public void testCheckFailingAppearFailSafe() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { @@ -78,7 +97,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { } @Test - public void testCheckFailingAppears() throws Exception { + public void testCheckFailingAppearTimedWait() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); + passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays + .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + } + + @Test + public void testCheckFailingAppearsFailSafe() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { @@ -87,7 +114,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { } @Test - public void testCheckFailingDisappear() throws Exception { + public void testCheckFailingAppearsTimedWait() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); + passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); + } + + @Test + public void testCheckFailingDisappearFailSafe() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { @@ -97,7 +131,15 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { } @Test - public void testCheckFailingDisappears() throws Exception { + public void testCheckFailingDisappearTimedWait() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); + passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays + .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + } + + @Test + public void testCheckFailingDisappearsFailSafe() throws Exception { HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); @@ -106,6 +148,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { }); } + @Test + public void testCheckFailingDisappearsTimedWait() throws Exception { + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); + passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); + } + private ConsistencyGuardConfig getConsistencyGuardConfig() { return getConsistencyGuardConfig(3, 10, 10); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java index 55d80df35..e55fb2423 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/ConsistencyGuardConfig.java @@ -36,15 +36,23 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig { // time between successive attempts to ensure written data's metadata is consistent on storage private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.initial_interval_ms"; - private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L; + private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 400L; // max interval time private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms"; - private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L; + private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 20000L; - // maximum number of checks, for consistency of written data. Will wait upto 256 Secs + // maximum number of checks, for consistency of written data. Will wait upto 140 Secs private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; - private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 6; + + // sleep time for OptimisticConsistencyGuard + private static final String OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = "hoodie.optimistic.consistency.guard.sleep_time_ms"; + private static long DEFAULT_OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP = 500L; + + // config to enable OptimisticConsistencyGuard in finalizeWrite instead of FailSafeConsistencyGuard + private static final String ENABLE_OPTIMISTIC_CONSISTENCY_GUARD = "_hoodie.optimistic.consistency.guard.enable"; + private static boolean DEFAULT_ENABLE_OPTIMISTIC_CONSISTENCY_GUARD = true; public ConsistencyGuardConfig(Properties props) { super(props); @@ -70,6 +78,14 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP)); } + public long getOptimisticConsistencyGuardSleepTimeMs() { + return Long.parseLong(props.getProperty(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP)); + } + + public boolean shouldEnableOptimisticConsistencyGuard() { + return Boolean.parseBoolean(props.getProperty(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD)); + } + /** * The builder used to build consistency configurations. */ @@ -109,6 +125,16 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig { return this; } + public Builder withOptimisticConsistencyGuardSleepTimeMs(long sleepTimeMs) { + props.setProperty(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP, String.valueOf(sleepTimeMs)); + return this; + } + + public Builder withEnableOptimisticConsistencyGuard(boolean enableOptimisticConsistencyGuard) { + props.setProperty(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD, String.valueOf(enableOptimisticConsistencyGuard)); + return this; + } + public ConsistencyGuardConfig build() { setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP), CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED); @@ -118,7 +144,11 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig { MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS)); setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP), MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS)); - + setDefaultOnCondition(props, !props.containsKey(OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP), + OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP, String.valueOf(DEFAULT_OPTIMISTIC_CONSISTENCY_GUARD_SLEEP_TIME_MS_PROP)); + setDefaultOnCondition(props, !props.containsKey(ENABLE_OPTIMISTIC_CONSISTENCY_GUARD), + ENABLE_OPTIMISTIC_CONSISTENCY_GUARD, + String.valueOf(DEFAULT_ENABLE_OPTIMISTIC_CONSISTENCY_GUARD)); return new ConsistencyGuardConfig(props); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java index ac3a4791e..2247b92a4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FailSafeConsistencyGuard.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -42,8 +41,8 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { private static final Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class); - private final FileSystem fs; - private final ConsistencyGuardConfig consistencyGuardConfig; + protected final FileSystem fs; + protected final ConsistencyGuardConfig consistencyGuardConfig; public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) { this.fs = fs; @@ -73,7 +72,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { /** * Helper function to wait for all files belonging to single directory to appear. - * + * * @param dirPath Dir Path * @param files Files to appear/disappear * @param event Appear/Disappear @@ -81,45 +80,19 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { */ public void waitForFilesVisibility(String dirPath, List files, FileVisibility event) throws TimeoutException { Path dir = new Path(dirPath); - List filesWithoutSchemeAndAuthority = - files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString) - .collect(Collectors.toList()); - - retryTillSuccess((retryNum) -> { - try { - LOG.info("Trying " + retryNum); - FileStatus[] entries = fs.listStatus(dir); - List gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath())) - .map(Path::toString).collect(Collectors.toList()); - List candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority); - boolean altered = candidateFiles.removeAll(gotFiles); - - switch (event) { - case DISAPPEAR: - LOG.info("Following files are visible" + candidateFiles); - // If no candidate files gets removed, it means all of them have disappeared - return !altered; - case APPEAR: - default: - // if all files appear, the list is empty - return candidateFiles.isEmpty(); - } - } catch (IOException ioe) { - LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe); - } - return false; - }, "Timed out waiting for files to become visible"); + List filesWithoutSchemeAndAuthority = getFilesWithoutSchemeAndAuthority(files); + retryTillSuccess(dir, filesWithoutSchemeAndAuthority, event); } /** * Helper to check of file visibility. - * + * * @param filePath File Path * @param visibility Visibility * @return true (if file visible in Path), false (otherwise) * @throws IOException - */ - private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException { + protected boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException { try { FileStatus status = fs.getFileStatus(filePath); switch (visibility) { @@ -142,10 +115,8 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { /** * Helper function to wait till file either appears/disappears. - * + * * @param filePath File Path - * @param visibility - * @throws TimeoutException */ private void waitForFileVisibility(Path filePath, FileVisibility visibility) throws TimeoutException { long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs(); @@ -169,17 +140,18 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { /** * Retries the predicate for condfigurable number of times till we the predicate returns success. - * - * @param predicate Predicate Function - * @param timedOutMessage Timed-Out message for logging + * + * @param dir directory of interest in which list of files are checked for visibility + * @param files List of files to check for visibility + * @param event {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event of interest. * @throws TimeoutException when retries are exhausted */ - private void retryTillSuccess(Function predicate, String timedOutMessage) throws TimeoutException { + private void retryTillSuccess(Path dir, List files, FileVisibility event) throws TimeoutException { long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs(); int attempt = 0; LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks()); while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) { - boolean success = predicate.apply(attempt); + boolean success = checkFilesVisibility(attempt, dir, files, event); if (success) { return; } @@ -188,11 +160,55 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard { waitMs = Math.min(waitMs, consistencyGuardConfig.getMaxConsistencyCheckIntervalMs()); attempt++; } - throw new TimeoutException(timedOutMessage); - + throw new TimeoutException("Timed out waiting for files to adhere to event " + event.name()); } - void sleepSafe(long waitMs) { + /** + * Helper to check for file visibility based on {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event. + * + * @param retryNum retry attempt count. + * @param dir directory of interest in which list of files are checked for visibility + * @param files List of files to check for visibility + * @param event {@link org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility} event of interest. + * @return {@code true} if condition succeeded. else {@code false}. + */ + protected boolean checkFilesVisibility(int retryNum, Path dir, List files, FileVisibility event) { + try { + LOG.info("Trying " + retryNum); + FileStatus[] entries = fs.listStatus(dir); + List gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath())) + .map(Path::toString).collect(Collectors.toList()); + List candidateFiles = new ArrayList<>(files); + boolean altered = candidateFiles.removeAll(gotFiles); + + switch (event) { + case DISAPPEAR: + LOG.info("Following files are visible" + candidateFiles); + // If no candidate files gets removed, it means all of them have disappeared + return !altered; + case APPEAR: + default: + // if all files appear, the list is empty + return candidateFiles.isEmpty(); + } + } catch (IOException ioe) { + LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe); + } + return false; + } + + /** + * Generate file names without scheme and authority. + * + * @param files list of files of interest. + * @return the filenames without scheme and authority. + */ + protected List getFilesWithoutSchemeAndAuthority(List files) { + return files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString) + .collect(Collectors.toList()); + } + + private void sleepSafe(long waitMs) { try { Thread.sleep(waitMs); } catch (InterruptedException e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java new file mode 100644 index 000000000..ded7ccce9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/OptimisticConsistencyGuard.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeoutException; + +/** + * A consistency guard which sleeps for configured period of time only on APPEAR. It is a no-op for DISAPPEAR. + * This is specifically for S3A filesystem and here is the rational. + * This guard is used when deleting data files corresponding to marker files that needs to be deleted. + * There are two tricky cases that needs to be considered. Case 1 : A data file creation is eventually consistent and hence + * when issuing deletes, it may not be found. Case 2: a data file was never created in the first place since the process crashed. + * In S3A, GET and LIST are eventually consistent, and delete() implementation internally does a LIST/EXISTS. + * Prior to this patch, hudi was leveraging {@link FailSafeConsistencyGuard} which was doing the following to delete data files. + * Step1: wait for all files to appear with linear backoff. + * Step2: issue deletes + * Step3: wait for all files to disappear with linear backoff. + * Step1 and Step2 is handled by {@link FailSafeConsistencyGuard}. + * + * We are simplifying these steps with {@link OptimisticConsistencyGuard}. + * Step1: Check if all files adhere to visibility event. If yes, proceed to Sptep 3. + * Step2: If not, Sleep for a configured threshold and then proceed to next step. + * Step3: issue deletes. + * + * With this, if any files that was created, should be available within configured threshold(eventual consistency). + * Delete() will return false if FileNotFound. So, both cases are taken care of this {@link ConsistencyGuard}. + */ +public class OptimisticConsistencyGuard extends FailSafeConsistencyGuard { + + private static final Logger LOG = LogManager.getLogger(OptimisticConsistencyGuard.class); + + public OptimisticConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) { + super(fs, consistencyGuardConfig); + } + + @Override + public void waitTillFileAppears(Path filePath) throws TimeoutException { + try { + if (!checkFileVisibility(filePath, FileVisibility.APPEAR)) { + Thread.sleep(consistencyGuardConfig.getOptimisticConsistencyGuardSleepTimeMs()); + } + } catch (IOException | InterruptedException ioe) { + LOG.warn("Got IOException or InterruptedException waiting for file visibility. Ignoring", ioe); + } + } + + @Override + public void waitTillFileDisappears(Path filePath) throws TimeoutException { + // no op + } + + @Override + public void waitTillAllFilesAppear(String dirPath, List files) throws TimeoutException { + try { + if (!checkFilesVisibility(1, new Path(dirPath), getFilesWithoutSchemeAndAuthority(files), FileVisibility.APPEAR)) { + Thread.sleep(consistencyGuardConfig.getOptimisticConsistencyGuardSleepTimeMs()); + } + } catch (InterruptedException ie) { + LOG.warn("Got InterruptedException waiting for file visibility. Ignoring", ie); + } + } + + @Override + public void waitTillAllFilesDisappear(String dirPath, List files) throws TimeoutException { + // no op + } +}