1
0

[HUDI-2077] Fix flakiness in TestHoodieDeltaStreamer (#3829)

This commit is contained in:
Raymond Xu
2021-10-20 20:57:12 -07:00
committed by GitHub
parent b68c5a68f9
commit f5d7362ee8
5 changed files with 371 additions and 288 deletions

View File

@@ -51,6 +51,7 @@ import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@@ -114,6 +115,10 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
return jsc.hadoopConfiguration();
}
public FileSystem fs() {
return FSUtils.getFs(basePath(), hadoopConf());
}
@Override
public HoodieSparkEngineContext context() {
return context;

View File

@@ -26,9 +26,8 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -38,7 +37,7 @@ import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.util.Random;
public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
static final Random RANDOM = new Random();
@@ -78,7 +77,6 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
static final String HOODIE_CONF_PARAM = "--hoodie-conf";
static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table";
static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class);
public static KafkaTestUtils testUtils;
protected static String topicName;
protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
@@ -93,7 +91,13 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress());
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
protected static void prepareInitialConfigs(FileSystem dfs, String dfsBasePath, String brokerAddress) throws IOException {
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties");
@@ -114,7 +118,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties");
writeCommonPropsToFile();
writeCommonPropsToFile(dfs, dfsBasePath);
// Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to
// downstream hudi table
@@ -139,23 +143,20 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
TypedProperties props1 = new TypedProperties();
populateAllCommonProps(props1);
populateAllCommonProps(props1, dfsBasePath, brokerAddress);
UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
TypedProperties properties = new TypedProperties();
populateInvalidTableConfigFilePathProps(properties);
populateInvalidTableConfigFilePathProps(properties, dfsBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE);
TypedProperties invalidHiveSyncProps = new TypedProperties();
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
protected static void writeCommonPropsToFile() throws IOException {
protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) throws IOException {
TypedProperties props = new TypedProperties();
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
@@ -192,20 +193,20 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
super.teardown();
}
private static void populateInvalidTableConfigFilePathProps(TypedProperties props) {
protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber");
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties");
}
static void populateAllCommonProps(TypedProperties props) {
populateCommonProps(props);
populateCommonKafkaProps(props);
protected static void populateAllCommonProps(TypedProperties props, String dfsBasePath, String brokerAddress) {
populateCommonProps(props, dfsBasePath);
populateCommonKafkaProps(props, brokerAddress);
populateCommonHiveProps(props);
}
protected static void populateCommonProps(TypedProperties props) {
protected static void populateCommonProps(TypedProperties props, String dfsBasePath) {
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");
props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
@@ -213,9 +214,9 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties");
}
protected static void populateCommonKafkaProps(TypedProperties props) {
protected static void populateCommonKafkaProps(TypedProperties props, String brokerAddress) {
//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("bootstrap.servers", brokerAddress);
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

View File

@@ -18,14 +18,11 @@
package org.apache.hudi.utilities.functional;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -73,7 +70,9 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -90,7 +89,6 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -104,11 +102,9 @@ import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -131,38 +127,10 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end.
*/
public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
protected static TypedProperties prepareMultiWriterProps(String propsFileName) throws IOException {
TypedProperties props = new TypedProperties();
populateAllCommonProps(props);
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
props.setProperty("include", "base.properties");
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY");
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1");
props.setProperty("hoodie.write.lock.zookeeper.port", "2828");
props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.write.lock.num_retries", "10");
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
return props;
}
protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster) throws IOException {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
@@ -272,18 +240,23 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
sqlContext.clearCache();
assertEquals(expected, recordCount);
}
static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
return sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time").count()
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath)
.groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList();
sqlContext.clearCache();
return rows;
}
static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
sqlContext.clearCache();
assertEquals(expected, recordCount);
}
@@ -291,6 +264,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
sqlContext.clearCache();
assertEquals(expected, recordCount);
}
@@ -343,7 +317,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
boolean ret = false;
while (!ret && !dsFuture.isDone()) {
try {
Thread.sleep(3000);
Thread.sleep(5000);
ret = condition.apply(true);
} catch (Throwable error) {
LOG.warn("Got error :", error);
@@ -681,7 +655,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
// clean up and reinit
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
writeCommonPropsToFile();
writeCommonPropsToFile(dfs, dfsBasePath);
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
@@ -695,21 +669,6 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}
@Test
public void testUpsertsCOWContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_mulitwriter");
}
@Test
public void testUpsertsMORContinuousModeWithMultipleWriters() throws Exception {
testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.MERGE_ON_READ, "continuous_mor_mulitwriter");
}
@Test
public void testLatestCheckpointCarryOverWithMultipleWriters() throws Exception {
testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_checkpoint");
}
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
@@ -734,218 +693,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
});
}
private void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// re-init ingestion job to start sync service
HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
}
private void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
String tableBasePath = dfsBasePath + "/" + tempDir;
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.tableType = tableType.name();
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
});
// create a backfill job with checkpoint from the first instant
HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
cfgBackfillJob.continuousMode = false;
cfgBackfillJob.tableType = tableType.name();
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
// run the backfill job, enable overriding checkpoint from the latest commit
props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY);
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
// reset checkpoint to first instant to simulate a random checkpoint for backfill job
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
backfillJob.sync();
// check if the checkpoint is carried over
timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill);
}
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp();
// Condition for parallel ingestion job
Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
} else {
TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs);
}
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
return true;
};
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
backfillJob.sync();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
backfillJobFuture.get();
regularIngestionJobFuture.get();
if (expectConflict) {
Assertions.fail("Failed to handle concurrent writes");
}
} catch (Exception e) {
/**
* Need to perform getMessage().contains since the exception coming
* from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions.
*/
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
} else {
throw e;
}
}
}
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
ds.sync();
@@ -959,7 +707,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
dsFuture.get();
}
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
deltaStreamerTestRunner(ds, null, condition);
}
@@ -1461,7 +1209,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
TypedProperties parquetProps = new TypedProperties();
if (addCommonProps) {
populateCommonProps(parquetProps);
populateCommonProps(parquetProps, dfsBasePath);
}
parquetProps.setProperty("include", "base.properties");
@@ -1521,7 +1269,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateAllCommonProps(props);
populateAllCommonProps(props, dfsBasePath, testUtils.brokerAddress());
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");

View File

@@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
@Tag("functional")
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
@ParameterizedTest
@EnumSource(HoodieTableType.class)
void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
final String basePath = basePath().replaceAll("/$", "");
final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
final String tableBasePath = basePath + "/testtable_" + tableType;
prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
} else {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
}
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
return true;
});
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
// re-init ingestion job to start sync service
HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
final String basePath = basePath().replaceAll("/$", "");
final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
final String tableBasePath = basePath + "/testtable_" + tableType;
prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
} else {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
}
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
return true;
});
// create a backfill job with checkpoint from the first instant
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
// run the backfill job, enable overriding checkpoint from the latest commit
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY);
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// reset checkpoint to first instant to simulate a random checkpoint for backfill job
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();
// check if the checkpoint is carried over
timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill);
}
private static TypedProperties prepareMultiWriterProps(FileSystem fs, String basePath, String propsFilePath) throws IOException {
TypedProperties props = new TypedProperties();
HoodieDeltaStreamerTestBase.populateCommonProps(props, basePath);
HoodieDeltaStreamerTestBase.populateCommonHiveProps(props);
props.setProperty("include", "sql-transformer.properties");
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target.avsc");
props.setProperty("include", "base.properties");
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY");
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1");
props.setProperty("hoodie.write.lock.zookeeper.port", "2828");
props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.write.lock.num_retries", "10");
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
return props;
}
private static HoodieDeltaStreamer.Config getDeltaStreamerConfig(String basePath,
String tableType, WriteOperationType op, String propsFilePath, List<String> transformerClassNames) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType;
cfg.sourceClassName = TestDataSource.class.getName();
cfg.transformerClassNames = transformerClassNames;
cfg.operation = op;
cfg.enableHiveSync = false;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = propsFilePath;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = defaultSchemaProviderClassName;
cfg.deltaSyncSchedulingWeight = 1;
cfg.deltaSyncSchedulingMinShare = 1;
cfg.compactSchedulingWeight = 2;
cfg.compactSchedulingMinShare = 1;
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 10));
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 10));
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 10));
return cfg;
}
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp();
// Condition for parallel ingestion job
Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs());
} else {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs());
}
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
return true;
};
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
backfillJob.sync();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
backfillJobFuture.get();
regularIngestionJobFuture.get();
if (expectConflict) {
Assertions.fail("Failed to handle concurrent writes");
}
} catch (Exception e) {
/*
* Need to perform getMessage().contains since the exception coming
* from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions.
*/
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
} else {
throw e;
}
}
}
}

View File

@@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBase {
public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBase {
private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
@@ -237,7 +237,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
private String populateCommonPropsAndWriteToFile() throws IOException {
TypedProperties commonProps = new TypedProperties();
populateCommonProps(commonProps);
populateCommonProps(commonProps, dfsBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
return PROPS_FILENAME_TEST_PARQUET;
}