1
0

[HUDI-3713] Guarding archival for multi-writer (#5138)

This commit is contained in:
Sivabalan Narayanan
2022-03-30 22:44:31 -07:00
committed by GitHub
parent f6ff95f97c
commit 4569734d60
6 changed files with 138 additions and 20 deletions

View File

@@ -214,7 +214,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
try { try {
preCommit(inflightInstant, metadata); preCommit(inflightInstant, metadata);
commit(table, commitActionType, instantTime, metadata, stats); commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata); // already within lock, and so no lock requried for archival
postCommit(table, metadata, instantTime, extraMetadata, false);
LOG.info("Committed " + instantTime); LOG.info("Committed " + instantTime);
releaseResources(); releaseResources();
} catch (IOException e) { } catch (IOException e) {
@@ -474,14 +475,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param metadata Commit Metadata corresponding to committed instant * @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time * @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user * @param extraMetadata Additional Metadata passed by user
* @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
*/ */
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata,
boolean acquireLockForArchival) {
try { try {
// Delete the marker directory for the instant. // Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit(); autoCleanOnCommit();
autoArchiveOnCommit(table); autoArchiveOnCommit(table, acquireLockForArchival);
} finally { } finally {
this.heartbeatClient.stop(instantTime); this.heartbeatClient.stop(instantTime);
} }
@@ -565,7 +568,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
} }
} }
protected void autoArchiveOnCommit(HoodieTable table) { protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArchival) {
if (!config.isAutoArchive()) { if (!config.isAutoArchive()) {
return; return;
} }
@@ -576,7 +579,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
LOG.info("Async archiver has finished"); LOG.info("Async archiver has finished");
} else { } else {
LOG.info("Start to archive synchronously."); LOG.info("Start to archive synchronously.");
archive(table); archive(table, acquireLockForArchival);
} }
} }
@@ -845,15 +848,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* Trigger archival for the table. This ensures that the number of commits do not explode * Trigger archival for the table. This ensures that the number of commits do not explode
* and keep increasing unbounded over time. * and keep increasing unbounded over time.
* @param table table to commit on. * @param table table to commit on.
* @param acquireLockForArchival true if lock has to be acquired for archival. false otherwise.
*/ */
protected void archive(HoodieTable table) { protected void archive(HoodieTable table, boolean acquireLockForArchival) {
if (!tableServicesEnabled(config)) { if (!tableServicesEnabled(config)) {
return; return;
} }
try { try {
// We cannot have unbounded commit files. Archive commits if we have to archive // We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context); archiver.archiveIfRequired(context, acquireLockForArchival);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe); throw new HoodieIOException("Failed to archive", ioe);
} }
@@ -866,7 +870,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
public void archive() { public void archive() {
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf); HoodieTable table = createTable(config, hadoopConf);
archive(table); archive(table, true);
} }
/** /**

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -71,6 +72,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -96,6 +98,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private final int minInstantsToKeep; private final int minInstantsToKeep;
private final HoodieTable<T, I, K, O> table; private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient; private final HoodieTableMetaClient metaClient;
private final TransactionManager txnManager;
public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config; this.config = config;
@@ -104,6 +107,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); this.archiveFilePath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
this.maxInstantsToKeep = config.getMaxCommitsToKeep(); this.maxInstantsToKeep = config.getMaxCommitsToKeep();
this.minInstantsToKeep = config.getMinCommitsToKeep(); this.minInstantsToKeep = config.getMinCommitsToKeep();
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
} }
private Writer openWriter() { private Writer openWriter() {
@@ -143,11 +147,18 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
} }
} }
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
return archiveIfRequired(context, false);
}
/** /**
* Check if commits need to be archived. If yes, archive commits. * Check if commits need to be archived. If yes, archive commits.
*/ */
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException { public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
try { try {
if (acquireLock) {
txnManager.beginTransaction();
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
verifyLastMergeArchiveFilesIfNecessary(context); verifyLastMergeArchiveFilesIfNecessary(context);
boolean success = true; boolean success = true;
@@ -167,6 +178,9 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
return success; return success;
} finally { } finally {
close(); close();
if (acquireLock) {
txnManager.endTransaction();
}
} }
} }
@@ -485,9 +499,16 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
} }
} }
return instants.flatMap(hoodieInstant -> return instants.flatMap(hoodieInstant -> {
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); HoodieInstant.getComparableAction(hoodieInstant.getAction())));
if (instantsToStream != null) {
return instantsToStream.stream();
} else {
// if a concurrent writer archived the instant
return Collections.EMPTY_LIST.stream();
}
});
} }
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {

View File

@@ -329,12 +329,13 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
protected void postCommit(HoodieTable table, protected void postCommit(HoodieTable table,
HoodieCommitMetadata metadata, HoodieCommitMetadata metadata,
String instantTime, String instantTime,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata,
boolean acquireLockForArchival) {
try { try {
// Delete the marker directory for the instant. // Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoArchiveOnCommit(table); autoArchiveOnCommit(table, acquireLockForArchival);
} finally { } finally {
this.heartbeatClient.stop(instantTime); this.heartbeatClient.stop(instantTime);
} }

View File

@@ -200,7 +200,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
result.getWriteStats().get().size()); result.getWriteStats().get().size());
} }
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty(), true);
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType()); emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
} }

View File

@@ -285,7 +285,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
result.getWriteStats().get().size()); result.getWriteStats().get().size());
} }
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty(), true);
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType()); emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
} }

View File

@@ -20,11 +20,14 @@ package org.apache.hudi.io;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -42,6 +45,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -71,6 +75,12 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -131,7 +141,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
int maxDeltaCommitsMetadataTable, int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception { HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
} }
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
@@ -140,7 +151,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
int maxDeltaCommitsMetadataTable, int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception { HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
} }
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
@@ -151,7 +163,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
int archiveFilesBatch, int archiveFilesBatch,
long size) throws Exception { long size) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5, return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5,
maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size); maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size,
HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER);
} }
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
@@ -162,7 +175,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieTableType tableType, HoodieTableType tableType,
boolean enableArchiveMerge, boolean enableArchiveMerge,
int archiveFilesBatch, int archiveFilesBatch,
long size) throws Exception { long size,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
init(tableType); init(tableType);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
@@ -171,11 +186,15 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
.withArchiveMergeEnable(enableArchiveMerge) .withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch) .withArchiveMergeFilesBatchSize(archiveFilesBatch)
.withArchiveMergeSmallFileLimit(size) .withArchiveMergeSmallFileLimit(size)
.withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
.build()) .build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build()) .withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build()) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build())
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build())
.forTable("test-trip-table").build(); .forTable("test-trip-table").build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadata); initWriteConfigAndMetatableWriter(writeConfig, enableMetadata);
return writeConfig; return writeConfig;
@@ -404,6 +423,79 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload()); assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2,
HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,
HoodieFailedWritesCleaningPolicy.LAZY, WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
final ExecutorService executors = Executors.newFixedThreadPool(2);
List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
IntStream.range(0, 2).forEach(index -> {
completableFutureList.add(CompletableFuture.supplyAsync(() -> {
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
try {
// wait until 4 commits are available so that archival thread will have something to archive.
countDownLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new HoodieException("Should not have thrown InterruptedException ", e);
}
metaClient.reloadActiveTimeline();
while (!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29")
|| metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() > 4) {
try {
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, table);
archiver.archiveIfRequired(context, true);
// if not for below sleep, both archiving threads acquires lock in quick succession and does not give space for main thread
// to complete the write operation when metadata table is enabled.
if (enableMetadata) {
Thread.sleep(2);
}
} catch (IOException e) {
throw new HoodieException("IOException thrown while archiving ", e);
} catch (InterruptedException e) {
throw new HoodieException("Should not have thrown InterruptedException ", e);
}
table.getMetaClient().reloadActiveTimeline();
}
return true;
}, executors));
});
// do ingestion and trigger archive actions here.
for (int i = 1; i < 30; i++) {
testTable.doWriteOperation("0000000" + String.format("%02d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
if (i == 5) {
// start up archival threads only after 4 commits.
countDownLatch.countDown();
}
}
try {
CompletableFuture completableFuture = allOfTerminateOnFailure(completableFutureList);
completableFuture.get();
} finally {
executors.shutdownNow();
}
}
public static CompletableFuture allOfTerminateOnFailure(List<CompletableFuture<Boolean>> futures) {
CompletableFuture<?> failure = new CompletableFuture();
AtomicBoolean jobFailed = new AtomicBoolean(false);
for (CompletableFuture<?> f : futures) {
f.exceptionally(ex -> {
if (!jobFailed.getAndSet(true)) {
LOG.warn("One of the job failed. Cancelling all other futures. " + ex.getCause() + ", " + ex.getMessage());
futures.forEach(future -> future.cancel(true));
}
return null;
});
}
return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception { public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception {