1
0

[HUDI-3072] Fixing conflict resolution in transaction management code path for auto commit code path (#4588)

* Fixing conflict resolution in transaction management code path for auto commit code path

* Addressing comments

* Fixing test failures
This commit is contained in:
Sivabalan Narayanan
2022-01-24 05:43:28 -05:00
committed by GitHub
parent cfde45b548
commit e00a9042e9
8 changed files with 205 additions and 24 deletions

View File

@@ -60,9 +60,31 @@ public class TransactionUtils {
final Option<HoodieCommitMetadata> thisCommitMetadata, final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config, final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
return resolveWriteConflictIfAny(table, currentTxnOwnerInstant, thisCommitMetadata, config, lastCompletedTxnOwnerInstant, false);
}
/**
* Resolve any write conflicts when committing data.
*
* @param table
* @param currentTxnOwnerInstant
* @param thisCommitMetadata
* @param config
* @param lastCompletedTxnOwnerInstant
* @return
* @throws HoodieWriteConflictException
*/
public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
final HoodieTable table,
final Option<HoodieInstant> currentTxnOwnerInstant,
final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
boolean reloadActiveTimeline) throws HoodieWriteConflictException {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(reloadActiveTimeline
? table.getMetaClient().reloadActiveTimeline() : table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata())); final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata()));
instantStream.forEach(instant -> { instantStream.forEach(instant -> {
try { try {

View File

@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -152,18 +151,22 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) { protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT, final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime)); getCommitActionType(), instantTime));
this.txnManager.beginTransaction(inflightInstant, this.txnManager.beginTransaction(inflightInstant,
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty()); lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try { try {
setCommitMetadata(result);
// reload active timeline so as to get all updates after current transaction have started. hence setting last arg to true.
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner()); result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner(), true);
commit(extraMetadata, result); commit(extraMetadata, result);
} finally { } finally {
this.txnManager.endTransaction(inflightInstant); this.txnManager.endTransaction(inflightInstant);
} }
} }
protected abstract void setCommitMetadata(HoodieWriteMetadata<O> result);
protected abstract void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result); protected abstract void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result);
/** /**

View File

@@ -134,6 +134,12 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
} }
protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> result) {
result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()),
result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType())));
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) { protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType(); String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType); LOG.info("Committing " + instantTime + ", action Type " + actionType);
@@ -144,8 +150,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
try { try {
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), HoodieCommitMetadata metadata = result.getCommitMetadata().get();
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeTableMetadata(metadata, actionType); writeTableMetadata(metadata, actionType);

View File

@@ -196,6 +196,11 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
} }
protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> result) {
result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()),
result.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType())));
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) { protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType(); String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType); LOG.info("Committing " + instantTime + ", action Type " + actionType);
@@ -206,8 +211,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
try { try {
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), HoodieCommitMetadata metadata = result.getCommitMetadata().get();
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeTableMetadata(metadata, actionType); writeTableMetadata(metadata, actionType);

View File

@@ -181,6 +181,11 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
return null; return null;
} }
@Override
protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
result.setCommitMetadata(Option.of(new HoodieCommitMetadata()));
}
@Override @Override
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) { protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
// Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index // Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index

View File

@@ -273,6 +273,13 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
return table.getMetaClient().getCommitActionType(); return table.getMetaClient().getCommitActionType();
} }
@Override
protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collect(),
result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType())));
}
@Override @Override
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) { protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect"); context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
@@ -288,8 +295,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
finalizeWrite(instantTime, writeStats, result); finalizeWrite(instantTime, writeStats, result);
try { try {
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), HoodieCommitMetadata metadata = result.getCommitMetadata().get();
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeTableMetadata(metadata, actionType); writeTableMetadata(metadata, actionType);
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.client; package org.apache.hudi.client;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
@@ -39,6 +39,8 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -54,6 +56,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@@ -64,6 +67,7 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@@ -94,7 +98,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
} }
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getConfigBuilder() HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -105,7 +109,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.build()).withAutoCommit(false).withProperties(properties).build(); .build()).withAutoCommit(false).withProperties(properties).build();
// Create the first commit // Create the first commit
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200); createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true);
final int threadCount = 2; final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(2); final ExecutorService executors = Executors.newFixedThreadPool(2);
@@ -182,9 +186,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig cfg = getConfigBuilder() HoodieWriteConfig cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -257,7 +261,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath);
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
// Disabling embedded timeline server, it doesn't work with multiwriter // Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false)
@@ -276,7 +280,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
// Create the first commit with inserts // Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg); SparkRDDWriteClient client = getHoodieWriteClient(cfg);
createCommitWithInserts(cfg, client, "000", "001", 200); createCommitWithInserts(cfg, client, "000", "001", 200, true);
validInstants.add("001"); validInstants.add("001");
// Create 2 commits with upserts // Create 2 commits with upserts
createCommitWithUpserts(cfg, client, "001", "000", "002", 100); createCommitWithUpserts(cfg, client, "001", "000", "002", 100);
@@ -351,7 +355,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
final int numRecords = 100; final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000); latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> { assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords); createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords, true);
validInstants.add("007"); validInstants.add("007");
}); });
}); });
@@ -395,7 +399,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
} }
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build()) .withAutoClean(false).build())
@@ -411,7 +415,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.build(); .build();
// Create the first commit // Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true);
// Start another inflight commit // Start another inflight commit
String newCommitTime = "003"; String newCommitTime = "003";
int numRecords = 100; int numRecords = 100;
@@ -441,6 +445,134 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
} }
} }
@Test
public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build()).withAutoCommit(true).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
List<HoodieRecord> updates1 = dataGen.generateUpdates(newCommitTime1, 5000);
List<HoodieRecord> updates2 = dataGen.generateUpdates(newCommitTime2, 5000);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 4);
JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 4);
runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::upsert, true);
}
private void runConcurrentAndAssert(JavaRDD<HoodieRecord> writeRecords1, JavaRDD<HoodieRecord> writeRecords2,
SparkRDDWriteClient client1, SparkRDDWriteClient client2,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForConflict) throws ExecutionException, InterruptedException {
CountDownLatch runCountDownLatch = new CountDownLatch(2);
final ExecutorService executors = Executors.newFixedThreadPool(2);
String newCommitTime1 = "003";
String newCommitTime2 = "004";
AtomicBoolean client1Succeeded = new AtomicBoolean(true);
AtomicBoolean client2Succeeded = new AtomicBoolean(true);
Future future1 = executors.submit(() -> {
try {
ingestBatch(writeFn, client1, newCommitTime1, writeRecords1, runCountDownLatch);
} catch (IOException e) {
LOG.error("IOException thrown " + e.getMessage());
} catch (InterruptedException e) {
LOG.error("Interrupted Exception thrown " + e.getMessage());
} catch (Exception e) {
client1Succeeded.set(false);
}
}
);
Future future2 = executors.submit(() -> {
try {
ingestBatch(writeFn, client2, newCommitTime2, writeRecords2, runCountDownLatch);
} catch (IOException e) {
LOG.error("IOException thrown " + e.getMessage());
} catch (InterruptedException e) {
LOG.error("Interrupted Exception thrown " + e.getMessage());
} catch (Exception e) {
client2Succeeded.set(false);
}
}
);
future1.get();
future2.get();
if (assertForConflict) {
assertFalse(client1Succeeded.get() && client2Succeeded.get());
assertTrue(client1Succeeded.get() || client2Succeeded.get());
} else {
assertTrue(client2Succeeded.get() && client1Succeeded.get());
}
}
@Test
public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build()).withAutoCommit(true).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false);
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
List<HoodieRecord> updates1 = dataGen.generateInserts(newCommitTime1, 200);
List<HoodieRecord> updates2 = dataGen.generateInserts(newCommitTime2, 200);
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 1);
JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 1);
runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2, SparkRDDWriteClient::bulkInsert, false);
}
private void ingestBatch(Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient writeClient, String commitTime, JavaRDD<HoodieRecord> records,
CountDownLatch countDownLatch) throws IOException, InterruptedException {
writeClient.startCommitWithTime(commitTime);
countDownLatch.countDown();
countDownLatch.await();
JavaRDD<WriteStatus> statusJavaRDD = writeFn.apply(writeClient, records, commitTime);
statusJavaRDD.collect();
}
private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client, private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords, String prevCommitTime, String newCommitTime, int numRecords,
String partition) throws Exception { String partition) throws Exception {
@@ -450,11 +582,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
} }
private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords) throws Exception { String prevCommitTime, String newCommitTime, int numRecords,
boolean doCommit) throws Exception {
// Finish first base commmit // Finish first base commmit
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert, JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
false, false, numRecords); false, false, numRecords);
assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); if (doCommit) {
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
}
} }
private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit, private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit,

View File

@@ -892,7 +892,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())