[HUDI-3029] Transaction manager: avoid deadlock when doing begin and end transactions (#4363)
* [HUDI-3029] Transaction manager: avoid deadlock when doing begin and end transactions - Transaction manager has begin and end transactions as synchronized methods. Based on the lock provider implementaion, this can lead to deadlock situation when the underlying lock() calls are blocking or with a long timeout. - Fixing transaction manager begin and end transactions to not get to deadlock and to not assume anything on the lock provider implementation.
This commit is contained in:
committed by
GitHub
parent
47852446e8
commit
d1d48ed494
@@ -201,7 +201,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.of(inflightInstant));
|
||||
}
|
||||
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
|
||||
runTableServicesInline(table, metadata, extraMetadata);
|
||||
@@ -1063,13 +1063,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
|
||||
TableServiceType tableServiceType) {
|
||||
// A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
|
||||
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
|
||||
tableServiceType.getAction(), instantTime));
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
|
||||
tableServiceType.getAction(), instantTime)), Option.empty());
|
||||
this.txnManager.beginTransaction(inflightInstant, Option.empty());
|
||||
LOG.info("Scheduling table service " + tableServiceType);
|
||||
return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(inflightInstant);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -35,49 +35,64 @@ import java.io.Serializable;
|
||||
public class TransactionManager implements Serializable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TransactionManager.class);
|
||||
|
||||
private final LockManager lockManager;
|
||||
private Option<HoodieInstant> currentTxnOwnerInstant;
|
||||
private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
|
||||
private boolean supportsOptimisticConcurrency;
|
||||
private final boolean isOptimisticConcurrencyControlEnabled;
|
||||
private Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
|
||||
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();
|
||||
|
||||
public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
|
||||
this.lockManager = new LockManager(config, fs);
|
||||
this.supportsOptimisticConcurrency = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
|
||||
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
|
||||
}
|
||||
|
||||
public synchronized void beginTransaction() {
|
||||
if (supportsOptimisticConcurrency) {
|
||||
public void beginTransaction() {
|
||||
if (isOptimisticConcurrencyControlEnabled) {
|
||||
LOG.info("Transaction starting without a transaction owner");
|
||||
lockManager.lock();
|
||||
LOG.info("Transaction started");
|
||||
LOG.info("Transaction started without a transaction owner");
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void beginTransaction(Option<HoodieInstant> currentTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
||||
if (supportsOptimisticConcurrency) {
|
||||
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
|
||||
lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant);
|
||||
LOG.info("Latest completed transaction instant " + lastCompletedTxnOwnerInstant);
|
||||
this.currentTxnOwnerInstant = currentTxnOwnerInstant;
|
||||
LOG.info("Transaction starting with transaction owner " + currentTxnOwnerInstant);
|
||||
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
|
||||
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
||||
if (isOptimisticConcurrencyControlEnabled) {
|
||||
LOG.info("Transaction starting for " + newTxnOwnerInstant
|
||||
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
|
||||
lockManager.lock();
|
||||
LOG.info("Transaction started");
|
||||
reset(currentTxnOwnerInstant, newTxnOwnerInstant, lastCompletedTxnOwnerInstant);
|
||||
LOG.info("Transaction started for " + newTxnOwnerInstant
|
||||
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void endTransaction() {
|
||||
if (supportsOptimisticConcurrency) {
|
||||
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
|
||||
public void endTransaction() {
|
||||
if (isOptimisticConcurrencyControlEnabled) {
|
||||
LOG.info("Transaction ending without a transaction owner");
|
||||
lockManager.unlock();
|
||||
LOG.info("Transaction ended");
|
||||
this.lastCompletedTxnOwnerInstant = Option.empty();
|
||||
lockManager.resetLatestCompletedWriteInstant();
|
||||
LOG.info("Transaction ended without a transaction owner");
|
||||
}
|
||||
}
|
||||
|
||||
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
|
||||
if (isOptimisticConcurrencyControlEnabled) {
|
||||
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
|
||||
reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
|
||||
lockManager.unlock();
|
||||
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void reset(Option<HoodieInstant> callerInstant,
|
||||
Option<HoodieInstant> newTxnOwnerInstant,
|
||||
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
|
||||
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant == callerInstant) {
|
||||
this.currentTxnOwnerInstant = newTxnOwnerInstant;
|
||||
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (supportsOptimisticConcurrency) {
|
||||
if (isOptimisticConcurrencyControlEnabled) {
|
||||
lockManager.close();
|
||||
LOG.info("Transaction manager closed");
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLoc
|
||||
}
|
||||
|
||||
private String getLogMessage(LockState state) {
|
||||
return StringUtils.join(String.valueOf(Thread.currentThread().getId()),
|
||||
state.name(), " local process lock.");
|
||||
return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
|
||||
state.name(), " in-process lock.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,13 +20,10 @@ package org.apache.hudi.client.transaction.lock;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hudi.common.config.LockConfiguration;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.lock.LockProvider;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieLockException;
|
||||
@@ -46,11 +43,8 @@ public class LockManager implements Serializable, AutoCloseable {
|
||||
private final LockConfiguration lockConfiguration;
|
||||
private final SerializableConfiguration hadoopConf;
|
||||
private volatile LockProvider lockProvider;
|
||||
// Holds the latest completed write instant to know which ones to check conflict against
|
||||
private final AtomicReference<Option<HoodieInstant>> latestCompletedWriteInstant;
|
||||
|
||||
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
|
||||
this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
|
||||
this.writeConfig = writeConfig;
|
||||
this.hadoopConf = new SerializableConfiguration(fs.getConf());
|
||||
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
|
||||
@@ -100,22 +94,6 @@ public class LockManager implements Serializable, AutoCloseable {
|
||||
return lockProvider;
|
||||
}
|
||||
|
||||
public void setLatestCompletedWriteInstant(Option<HoodieInstant> instant) {
|
||||
this.latestCompletedWriteInstant.set(instant);
|
||||
}
|
||||
|
||||
public void compareAndSetLatestCompletedWriteInstant(Option<HoodieInstant> expected, Option<HoodieInstant> newValue) {
|
||||
this.latestCompletedWriteInstant.compareAndSet(expected, newValue);
|
||||
}
|
||||
|
||||
public AtomicReference<Option<HoodieInstant>> getLatestCompletedWriteInstant() {
|
||||
return latestCompletedWriteInstant;
|
||||
}
|
||||
|
||||
public void resetLatestCompletedWriteInstant() {
|
||||
this.latestCompletedWriteInstant.set(Option.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closeQuietly();
|
||||
|
||||
@@ -217,7 +217,7 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
|
||||
throw new HoodieIOException("Failed to clean up after commit", e);
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,14 +147,16 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
|
||||
}
|
||||
|
||||
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
|
||||
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
|
||||
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT,
|
||||
HoodieTimeline.COMMIT_ACTION, instantTime));
|
||||
this.txnManager.beginTransaction(inflightInstant,
|
||||
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
|
||||
try {
|
||||
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
|
||||
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
|
||||
commit(extraMetadata, result);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(inflightInstant);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
|
||||
this.txnManager.beginTransaction(Option.empty(), Option.empty());
|
||||
writeTableMetadata(restoreMetadata);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +266,7 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
|
||||
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
|
||||
} finally {
|
||||
if (!skipLocking) {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.transaction;
|
||||
|
||||
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
import org.apache.hudi.common.model.WriteConcurrencyMode;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieLockConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieLockException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class TestTransactionManager extends HoodieCommonTestHarness {
|
||||
HoodieWriteConfig writeConfig;
|
||||
TransactionManager transactionManager;
|
||||
|
||||
@BeforeEach
|
||||
private void init() throws IOException {
|
||||
initPath();
|
||||
initMetaClient();
|
||||
this.writeConfig = getWriteConfig();
|
||||
this.transactionManager = new TransactionManager(this.writeConfig, this.metaClient.getFs());
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getWriteConfig() {
|
||||
return HoodieWriteConfig.newBuilder()
|
||||
.withPath(basePath)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
|
||||
.build())
|
||||
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
|
||||
.withLockConfig(HoodieLockConfig.newBuilder()
|
||||
.withLockProvider(InProcessLockProvider.class)
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleWriterTransaction() {
|
||||
transactionManager.beginTransaction();
|
||||
transactionManager.endTransaction();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleWriterNestedTransaction() {
|
||||
transactionManager.beginTransaction();
|
||||
assertThrows(HoodieLockException.class, () -> {
|
||||
transactionManager.beginTransaction();
|
||||
});
|
||||
|
||||
transactionManager.endTransaction();
|
||||
assertThrows(HoodieLockException.class, () -> {
|
||||
transactionManager.endTransaction();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiWriterTransactions() {
|
||||
final int threadCount = 3;
|
||||
final long awaitMaxTimeoutMs = 2000L;
|
||||
final CountDownLatch latch = new CountDownLatch(threadCount);
|
||||
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
|
||||
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
|
||||
|
||||
// Let writer1 get the lock first, then wait for others
|
||||
// to join the sync up point.
|
||||
Thread writer1 = new Thread(() -> {
|
||||
assertDoesNotThrow(() -> {
|
||||
transactionManager.beginTransaction();
|
||||
});
|
||||
latch.countDown();
|
||||
try {
|
||||
latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
// Following sleep is to make sure writer2 attempts
|
||||
// to try lock and to get blocked on the lock which
|
||||
// this thread is currently holding.
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
//
|
||||
}
|
||||
assertDoesNotThrow(() -> {
|
||||
transactionManager.endTransaction();
|
||||
});
|
||||
writer1Completed.set(true);
|
||||
});
|
||||
writer1.start();
|
||||
|
||||
// Writer2 will block on trying to acquire the lock
|
||||
// and will eventually get the lock before the timeout.
|
||||
Thread writer2 = new Thread(() -> {
|
||||
latch.countDown();
|
||||
try {
|
||||
latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
//
|
||||
}
|
||||
assertDoesNotThrow(() -> {
|
||||
transactionManager.beginTransaction();
|
||||
});
|
||||
assertDoesNotThrow(() -> {
|
||||
transactionManager.endTransaction();
|
||||
});
|
||||
writer2Completed.set(true);
|
||||
});
|
||||
writer2.start();
|
||||
|
||||
// Let writer1 and writer2 wait at the sync up
|
||||
// point to make sure they run in parallel and
|
||||
// one get blocked by the other.
|
||||
latch.countDown();
|
||||
try {
|
||||
writer1.join();
|
||||
writer2.join();
|
||||
} catch (InterruptedException e) {
|
||||
//
|
||||
}
|
||||
|
||||
// Make sure both writers actually completed good
|
||||
Assertions.assertTrue(writer1Completed.get());
|
||||
Assertions.assertTrue(writer2Completed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransactionsWithInstantTime() {
|
||||
// 1. Begin and end by the same transaction owner
|
||||
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
|
||||
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
|
||||
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
|
||||
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
|
||||
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||
|
||||
// 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner
|
||||
lastCompletedInstant = getInstant("0000002");
|
||||
newTxnOwnerInstant = getInstant("0000003");
|
||||
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||
transactionManager.endTransaction();
|
||||
// Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
|
||||
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
|
||||
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
|
||||
|
||||
// 3. But, we should be able to begin a new transaction for a new owner
|
||||
lastCompletedInstant = getInstant("0000003");
|
||||
newTxnOwnerInstant = getInstant("0000004");
|
||||
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
|
||||
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
|
||||
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
|
||||
transactionManager.endTransaction(newTxnOwnerInstant);
|
||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||
|
||||
// 4. Transactions with no owners should also go through
|
||||
transactionManager.beginTransaction();
|
||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||
transactionManager.endTransaction();
|
||||
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
|
||||
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
|
||||
}
|
||||
|
||||
private Option<HoodieInstant> getInstant(String timestamp) {
|
||||
return Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, timestamp));
|
||||
}
|
||||
}
|
||||
@@ -359,8 +359,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
|
||||
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
try {
|
||||
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
@@ -371,7 +371,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.of(compactionInstant));
|
||||
}
|
||||
if (compactionTimer != null) {
|
||||
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
|
||||
|
||||
@@ -300,8 +300,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
String compactionCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
|
||||
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
try {
|
||||
HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
@@ -309,7 +309,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.of(compactionInstant));
|
||||
}
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
@@ -378,8 +378,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
throw new HoodieClusteringException("Clustering failed to write to files:"
|
||||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
|
||||
}
|
||||
final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
||||
try {
|
||||
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
||||
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
||||
finalizeWrite(table, clusteringCommitTime, writeStats);
|
||||
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
|
||||
@@ -395,7 +395,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
} catch (Exception e) {
|
||||
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
this.txnManager.endTransaction(Option.of(clusteringInstant));
|
||||
}
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
|
||||
Reference in New Issue
Block a user