1
0

[HUDI-845] Added locking capability to allow multiple writers (#2374)

* [HUDI-845] Added locking capability to allow multiple writers
1. Added LockProvider API for pluggable lock methodologies
2. Added Resolution Strategy API to allow for pluggable conflict resolution
3. Added TableService client API to schedule table services
4. Added Transaction Manager for wrapping actions within transactions
This commit is contained in:
n3nash
2021-03-16 16:43:53 -07:00
committed by GitHub
parent b038623ed3
commit 74241947c1
88 changed files with 4876 additions and 381 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.utilities.functional;
import java.util.ConcurrentModificationException;
import java.util.concurrent.ExecutorService;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
@@ -79,6 +81,7 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -100,6 +103,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -120,6 +124,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
protected static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties";
private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
@@ -270,6 +275,34 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
MultiPartKeysValueExtractor.class.getName());
}
protected static TypedProperties prepareMultiWriterProps(String propsFileName) throws IOException {
TypedProperties props = new TypedProperties();
populateAllCommonProps(props);
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
props.setProperty("include", "base.properties");
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
props.setProperty("hoodie.writer.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
props.setProperty("hoodie.writer.lock.hivemetastore.database", "testdb1");
props.setProperty("hoodie.writer.lock.hivemetastore.table", "table1");
props.setProperty("hoodie.writer.lock.zookeeper.url", "127.0.0.1");
props.setProperty("hoodie.writer.lock.zookeeper.port", "2828");
props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.writer.lock.num_retries", "10");
props.setProperty("hoodie.writer.lock.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
return props;
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
@@ -398,6 +431,22 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
}
static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
@@ -406,14 +455,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), HoodieCommitMetadata.class);
assertEquals(totalCommits, timeline.countInstants());
assertEquals(expected, commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY));
return lastInstant.getTimestamp();
}
static void waitTillCondition(Function<Boolean, Boolean> condition, long timeoutInSecs) throws Exception {
static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception {
Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
boolean ret = false;
while (!ret) {
while (!ret && !dsFuture.isDone()) {
try {
Thread.sleep(3000);
ret = condition.apply(true);
@@ -649,6 +698,21 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
@Test
public void testUpsertsCOWContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_mulitwriter");
}
@Test
public void testUpsertsMORContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.MERGE_ON_READ, "continuous_mor_mulitwriter");
}
@Test
public void testLatestCheckpointCarryOverWithMultipleWriters() throws Exception {
testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_checkpoint");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
@@ -673,6 +737,215 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
});
}
private void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.writer.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.writer.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// re-init ingestion job to start sync service
HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.writer.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.writer.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
}
private void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.writer.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.writer.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job with checkpoint from the first instant
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
// run the backfill job, enable overriding checkpoint from the latest commit
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.writer.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.writer.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// reset checkpoint to first instant to simulate a random checkpoint for backfill job
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
backfillJob.sync();
// check if the checkpoint is carried over
timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill);
}
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp();
// Condition for parallel ingestion job
Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
};
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
backfillJob.sync();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
backfillJobFuture.get();
regularIngestionJobFuture.get();
if (expectConflict) {
Assertions.fail("Failed to handle concurrent writes");
}
} catch (Exception e) {
/**
* Need to perform getMessage().contains since the exception coming
* from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions.
*/
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
} else {
throw e;
}
}
}
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
@@ -682,7 +955,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
});
TestHelpers.waitTillCondition(condition, 240);
TestHelpers.waitTillCondition(condition, dsFuture, 240);
ds.shutdownGracefully();
dsFuture.get();
}
@@ -1369,4 +1642,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
public static class TestIdentityTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
return rowDataset;
}
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -92,6 +93,7 @@ public class UtilitiesTestBase {
protected transient SQLContext sqlContext;
protected static HiveServer2 hiveServer;
protected static HiveTestService hiveTestService;
protected static ZookeeperTestService zookeeperTestService;
private static ObjectMapper mapper = new ObjectMapper();
@BeforeAll
@@ -105,6 +107,7 @@ public class UtilitiesTestBase {
public static void initClass(boolean startHiveService) throws Exception {
hdfsTestService = new HdfsTestService();
zookeeperTestService = new ZookeeperTestService(hdfsTestService.getHadoopConf());
dfsCluster = hdfsTestService.start(true);
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
@@ -114,6 +117,7 @@ public class UtilitiesTestBase {
hiveServer = hiveTestService.start();
clearHiveDb();
}
zookeeperTestService.start();
}
@AfterAll
@@ -127,6 +131,9 @@ public class UtilitiesTestBase {
if (hiveTestService != null) {
hiveTestService.stop();
}
if (zookeeperTestService != null) {
zookeeperTestService.stop();
}
}
@BeforeEach

View File

@@ -126,6 +126,9 @@ public abstract class AbstractBaseTestSource extends AvroSource {
}
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(instantTime, numInserts, false, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(AbstractBaseTestSource::toGenericRecord);
if (Boolean.valueOf(props.getOrDefault("hoodie.test.source.generate.inserts", "false").toString())) {
return insertStream;
}
return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
}