diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 0e372cb21..aba6fad5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -214,7 +214,8 @@ public abstract class BaseHoodieWriteClient> extraMetadata) { + protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option> extraMetadata, + boolean acquireLockForArchival) { try { // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); autoCleanOnCommit(); - autoArchiveOnCommit(table); + autoArchiveOnCommit(table, acquireLockForArchival); } finally { this.heartbeatClient.stop(instantTime); } @@ -565,7 +568,7 @@ public abstract class BaseHoodieWriteClient { private final int minInstantsToKeep; private final HoodieTable table; private final HoodieTableMetaClient metaClient; + private final TransactionManager txnManager; public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable table) { this.config = config; @@ -104,6 +107,7 @@ public class HoodieTimelineArchiver { this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); this.maxInstantsToKeep = config.getMaxCommitsToKeep(); this.minInstantsToKeep = config.getMinCommitsToKeep(); + this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); } private Writer openWriter() { @@ -143,11 +147,18 @@ public class HoodieTimelineArchiver { } } + public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { + return archiveIfRequired(context, false); + } + /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { + public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { try { + if (acquireLock) { + txnManager.beginTransaction(); + } List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); verifyLastMergeArchiveFilesIfNecessary(context); boolean success = true; @@ -167,6 +178,9 @@ public class HoodieTimelineArchiver { return success; } finally { close(); + if (acquireLock) { + txnManager.endTransaction(); + } } } @@ -485,9 +499,16 @@ public class HoodieTimelineArchiver { } } - return instants.flatMap(hoodieInstant -> - groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), - HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); + return instants.flatMap(hoodieInstant -> { + List instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), + HoodieInstant.getComparableAction(hoodieInstant.getAction()))); + if (instantsToStream != null) { + return instantsToStream.stream(); + } else { + // if a concurrent writer archived the instant + return Collections.EMPTY_LIST.stream(); + } + }); } private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 626727ad0..271ba95d9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -329,12 +329,13 @@ public class HoodieFlinkWriteClient extends protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, - Option> extraMetadata) { + Option> extraMetadata, + boolean acquireLockForArchival) { try { // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - autoArchiveOnCommit(table); + autoArchiveOnCommit(table, acquireLockForArchival); } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index a506131cb..7f5dc19ba 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -200,7 +200,7 @@ public class HoodieJavaWriteClient extends result.getWriteStats().get().size()); } - postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); + postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty(), true); emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index b3e3c25c8..7b0c8bbc8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -285,7 +285,7 @@ public class SparkRDDWriteClient extends result.getWriteStats().get().size()); } - postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); + postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty(), true); emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 0fe602cd0..445780384 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -20,11 +20,14 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -42,6 +45,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -71,6 +75,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -131,7 +141,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { int maxDeltaCommitsMetadataTable, HoodieTableType tableType) throws Exception { return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, - maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); + maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200, + HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER); } private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, @@ -140,7 +151,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { int maxDeltaCommitsMetadataTable, HoodieTableType tableType) throws Exception { return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, - 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); + 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200, + HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER); } private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, @@ -151,7 +163,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { int archiveFilesBatch, long size) throws Exception { return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5, - maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size); + maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size, + HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER); } private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, @@ -162,7 +175,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieTableType tableType, boolean enableArchiveMerge, int archiveFilesBatch, - long size) throws Exception { + long size, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + WriteConcurrencyMode writeConcurrencyMode) throws Exception { init(tableType); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) @@ -171,11 +186,15 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .withArchiveMergeEnable(enableArchiveMerge) .withArchiveMergeFilesBatchSize(archiveFilesBatch) .withArchiveMergeSmallFileLimit(size) + .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) .build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build()) + .withWriteConcurrencyMode(writeConcurrencyMode) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .build()) .forTable("test-trip-table").build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadata); return writeConfig; @@ -404,6 +423,79 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, + HoodieTableType.COPY_ON_WRITE, false, 10, 209715200, + HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL); + + final ExecutorService executors = Executors.newFixedThreadPool(2); + List> completableFutureList = new ArrayList<>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + IntStream.range(0, 2).forEach(index -> { + completableFutureList.add(CompletableFuture.supplyAsync(() -> { + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + try { + // wait until 4 commits are available so that archival thread will have something to archive. + countDownLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new HoodieException("Should not have thrown InterruptedException ", e); + } + metaClient.reloadActiveTimeline(); + while (!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29") + || metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 4) { + try { + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table); + archiver.archiveIfRequired(context, true); + // if not for below sleep, both archiving threads acquires lock in quick succession and does not give space for main thread + // to complete the write operation when metadata table is enabled. + if (enableMetadata) { + Thread.sleep(2); + } + } catch (IOException e) { + throw new HoodieException("IOException thrown while archiving ", e); + } catch (InterruptedException e) { + throw new HoodieException("Should not have thrown InterruptedException ", e); + } + table.getMetaClient().reloadActiveTimeline(); + } + return true; + }, executors)); + }); + + // do ingestion and trigger archive actions here. + for (int i = 1; i < 30; i++) { + testTable.doWriteOperation("0000000" + String.format("%02d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + if (i == 5) { + // start up archival threads only after 4 commits. + countDownLatch.countDown(); + } + } + + try { + CompletableFuture completableFuture = allOfTerminateOnFailure(completableFutureList); + completableFuture.get(); + } finally { + executors.shutdownNow(); + } + } + + public static CompletableFuture allOfTerminateOnFailure(List> futures) { + CompletableFuture failure = new CompletableFuture(); + AtomicBoolean jobFailed = new AtomicBoolean(false); + for (CompletableFuture f : futures) { + f.exceptionally(ex -> { + if (!jobFailed.getAndSet(true)) { + LOG.warn("One of the job failed. Cancelling all other futures. " + ex.getCause() + ", " + ex.getMessage()); + futures.forEach(future -> future.cancel(true)); + } + return null; + }); + } + return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception {