1
0

[HUDI-2527] Multi writer test with conflicting async table services (#4046)

This commit is contained in:
Manoj Govindassamy
2021-12-10 17:01:19 -08:00
committed by GitHub
parent 2d864f7524
commit c48a2a125a

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -37,11 +38,8 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -53,12 +51,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -132,7 +134,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
}
}
@Disabled
@Test
public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
}
@@ -202,6 +204,21 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
});
}
/**
* Count down the latch and await for all the needed threads to join.
*
* @param latch - Count down latch
* @param waitTimeMillis - Max wait time in millis for waiting
*/
private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {
latch.countDown();
try {
latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//
}
}
private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -238,82 +255,101 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
validInstants.add("002");
validInstants.add("003");
ExecutorService executors = Executors.newFixedThreadPool(2);
// write config with clustering enabled
HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
// Three clients running actions in parallel
final int threadCount = 3;
final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount);
final ExecutorService executors = Executors.newFixedThreadPool(threadCount);
// Write config with clustering enabled
final HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.build())
.build();
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
if (tableType == HoodieTableType.MERGE_ON_READ) {
fail("Conflicts not handled correctly");
}
validInstants.add("004");
} catch (Exception e1) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertTrue(e1 instanceof HoodieWriteConflictException);
}
final String newCommitTime = "004";
final int numRecords = 100;
final String commitTimeBetweenPrevAndNew = "002";
// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
// Since the compaction already went in, this upsert has
// to fail
assertThrows(IllegalArgumentException.class, () -> {
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
} else {
// We don't have the compaction for COW and so this upsert
// has to pass
assertDoesNotThrow(() -> {
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
validInstants.add(newCommitTime);
}
});
Future future2 = executors.submit(() -> {
try {
client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT);
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
throw new RuntimeException(e2);
}
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
});
Future future3 = executors.submit(() -> {
try {
client2.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN);
} catch (Exception e2) {
throw new RuntimeException(e2);
}
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
client3.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN);
});
});
future1.get();
future2.get();
future3.get();
CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
// Create inserts, run cleaning, run compaction in parallel
future1 = executors.submit(() -> {
String newCommitTime = "007";
int numRecords = 100;
try {
createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords);
final String newCommitTime = "007";
final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords);
validInstants.add("007");
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
});
future2 = executors.submit(() -> {
try {
JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) client2.compact("005");
client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
validInstants.add("005");
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
throw new RuntimeException(e2);
}
latchCountDownAndWait(runCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) client2.compact("005");
client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
validInstants.add("005");
});
}
});
future3 = executors.submit(() -> {
try {
client2.clean("006", false);
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
client3.clean("006", false);
validInstants.add("006");
} catch (Exception e2) {
throw new RuntimeException(e2);
}
});
});
future1.get();
future2.get();
future3.get();
validInstants.addAll(
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));