diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index b8b470434..74ab52daa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -51,6 +51,7 @@ import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -114,6 +115,10 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe return jsc.hadoopConfiguration(); } + public FileSystem fs() { + return FSUtils.getFs(basePath(), hadoopConf()); + } + @Override public HoodieSparkEngineContext context() { return context; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java similarity index 93% rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java rename to hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index 51b51d865..043b0a4e0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -26,9 +26,8 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -38,7 +37,7 @@ import org.junit.jupiter.api.BeforeEach; import java.io.IOException; import java.util.Random; -public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { +public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static final Random RANDOM = new Random(); @@ -78,7 +77,6 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { static final String HOODIE_CONF_PARAM = "--hoodie-conf"; static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; - static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class); public static KafkaTestUtils testUtils; protected static String topicName; protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); @@ -93,7 +91,13 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { testUtils = new KafkaTestUtils(); testUtils.setup(); topicName = "topic" + testNum; + prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress()); + prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); + prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); + } + + protected static void prepareInitialConfigs(FileSystem dfs, String dfsBasePath, String brokerAddress) throws IOException { // prepare the configs. UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties"); @@ -114,7 +118,7 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); - writeCommonPropsToFile(); + writeCommonPropsToFile(dfs, dfsBasePath); // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to // downstream hudi table @@ -139,23 +143,20 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); TypedProperties props1 = new TypedProperties(); - populateAllCommonProps(props1); + populateAllCommonProps(props1, dfsBasePath, brokerAddress); UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1); TypedProperties properties = new TypedProperties(); - populateInvalidTableConfigFilePathProps(properties); + populateInvalidTableConfigFilePathProps(properties, dfsBasePath); UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE); TypedProperties invalidHiveSyncProps = new TypedProperties(); invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); - - prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); - prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT); } - protected static void writeCommonPropsToFile() throws IOException { + protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) throws IOException { TypedProperties props = new TypedProperties(); props.setProperty("include", "sql-transformer.properties"); props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); @@ -192,20 +193,20 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { super.teardown(); } - private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { + protected static void populateInvalidTableConfigFilePathProps(TypedProperties props, String dfsBasePath) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); } - static void populateAllCommonProps(TypedProperties props) { - populateCommonProps(props); - populateCommonKafkaProps(props); + protected static void populateAllCommonProps(TypedProperties props, String dfsBasePath, String brokerAddress) { + populateCommonProps(props, dfsBasePath); + populateCommonKafkaProps(props, brokerAddress); populateCommonHiveProps(props); } - protected static void populateCommonProps(TypedProperties props) { + protected static void populateCommonProps(TypedProperties props, String dfsBasePath) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); @@ -213,9 +214,9 @@ public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); } - protected static void populateCommonKafkaProps(TypedProperties props) { + protected static void populateCommonKafkaProps(TypedProperties props, String brokerAddress) { //Kafka source properties - props.setProperty("bootstrap.servers", testUtils.brokerAddress()); + props.setProperty("bootstrap.servers", brokerAddress); props.setProperty("auto.offset.reset", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 58b665cc8..f49c14899 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,14 +18,11 @@ package org.apache.hudi.utilities.functional; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -73,7 +70,9 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -90,7 +89,6 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -104,11 +102,9 @@ import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -131,38 +127,10 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. */ -public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { +public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); - protected static TypedProperties prepareMultiWriterProps(String propsFileName) throws IOException { - TypedProperties props = new TypedProperties(); - populateAllCommonProps(props); - - props.setProperty("include", "sql-transformer.properties"); - props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName()); - props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); - - props.setProperty("include", "base.properties"); - props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control"); - props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY"); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"); - props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1"); - props.setProperty("hoodie.write.lock.hivemetastore.table", "table1"); - props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1"); - props.setProperty("hoodie.write.lock.zookeeper.port", "2828"); - props.setProperty("hoodie.write.lock.wait_time_ms", "1200000"); - props.setProperty("hoodie.write.lock.num_retries", "10"); - props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table"); - props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test"); - - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); - return props; - } - protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster) throws IOException { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; @@ -272,18 +240,23 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count(); + sqlContext.clearCache(); assertEquals(expected, recordCount); } static List countsPerCommit(String tablePath, SQLContext sqlContext) { - return sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time").count() + List rows = sqlContext.read().format("org.apache.hudi").load(tablePath) + .groupBy("_hoodie_commit_time").count() .sort("_hoodie_commit_time").collectAsList(); + sqlContext.clearCache(); + return rows; } static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) { sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count(); + sqlContext.clearCache(); assertEquals(expected, recordCount); } @@ -291,6 +264,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count(); + sqlContext.clearCache(); assertEquals(expected, recordCount); } @@ -343,7 +317,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { boolean ret = false; while (!ret && !dsFuture.isDone()) { try { - Thread.sleep(3000); + Thread.sleep(5000); ret = condition.apply(true); } catch (Throwable error) { LOG.warn("Got error :", error); @@ -681,7 +655,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { // clean up and reinit UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); - writeCommonPropsToFile(); + writeCommonPropsToFile(dfs, dfsBasePath); defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); } @@ -695,21 +669,6 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } - @Test - public void testUpsertsCOWContinuousModeWithMultipleWriters() throws Exception { - testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_mulitwriter"); - } - - @Test - public void testUpsertsMORContinuousModeWithMultipleWriters() throws Exception { - testUpsertsContinuousModeWithMultipleWriters(HoodieTableType.MERGE_ON_READ, "continuous_mor_mulitwriter"); - } - - @Test - public void testLatestCheckpointCarryOverWithMultipleWriters() throws Exception { - testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType.COPY_ON_WRITE, "continuous_cow_checkpoint"); - } - private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode @@ -734,218 +693,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { }); } - private void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception { - // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - String tableBasePath = dfsBasePath + "/" + tempDir; - // enable carrying forward latest checkpoint - TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; - - HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, - Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.tableType = tableType.name(); - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc); - - // Prepare base dataset with some commits - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs); - } else { - TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs); - } - TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - return true; - }); - - // create a backfill job - HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, - Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgBackfillJob.continuousMode = false; - cfgBackfillJob.tableType = tableType.name(); - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); - cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc); - - // re-init ingestion job to start sync service - HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc); - - // run ingestion & backfill in parallel, create conflict and fail one - runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, - cfgIngestionJob, backfillJob, cfgBackfillJob, true); - - // create new ingestion & backfill job config to generate only INSERTS to avoid conflict - props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty("hoodie.test.source.generate.inserts", "true"); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, - Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgBackfillJob.continuousMode = false; - cfgBackfillJob.tableType = tableType.name(); - meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); - timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - commitMetadata = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); - cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - - cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, - Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.tableType = tableType.name(); - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - // re-init ingestion job - HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc); - // re-init backfill job - HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc); - - // run ingestion & backfill in parallel, avoid conflict and succeed both - runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, - cfgIngestionJob, backfillJob2, cfgBackfillJob, false); - } - - private void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType, String tempDir) throws Exception { - // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - String tableBasePath = dfsBasePath + "/" + tempDir; - // enable carrying forward latest checkpoint - TypedProperties props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; - - HoodieDeltaStreamer.Config cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, - Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.tableType = tableType.name(); - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc); - - // Prepare base dataset with some commits - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, dfs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, dfs); - } else { - TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, dfs); - } - TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - return true; - }); - - // create a backfill job with checkpoint from the first instant - HoodieDeltaStreamer.Config cfgBackfillJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, - Arrays.asList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false); - cfgBackfillJob.continuousMode = false; - cfgBackfillJob.tableType = tableType.name(); - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - - // get current checkpoint after preparing base dataset with some commits - HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); - String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); - - // run the backfill job, enable overriding checkpoint from the latest commit - props = prepareMultiWriterProps(PROPS_FILENAME_TEST_MULTI_WRITER); - props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); - props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - - // reset checkpoint to first instant to simulate a random checkpoint for backfill job - // checkpoint will move from 00000 to 00001 for this backfill job - cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY); - cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc); - backfillJob.sync(); - - // check if the checkpoint is carried over - timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); - commitMetadataForLastInstant = HoodieCommitMetadata - .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); - String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); - Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill); - } - - private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, - HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, - HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { - ExecutorService service = Executors.newFixedThreadPool(2); - HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp(); - // Condition for parallel ingestion job - Function conditionForRegularIngestion = (r) -> { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs); - } else { - TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, dfs); - } - TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); - return true; - }; - - try { - Future regularIngestionJobFuture = service.submit(() -> { - try { - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - }); - Future backfillJobFuture = service.submit(() -> { - try { - backfillJob.sync(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - }); - backfillJobFuture.get(); - regularIngestionJobFuture.get(); - if (expectConflict) { - Assertions.fail("Failed to handle concurrent writes"); - } - } catch (Exception e) { - /** - * Need to perform getMessage().contains since the exception coming - * from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions. - */ - if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) { - // expected ConcurrentModificationException since ingestion & backfill will have overlapping writes - } else { - throw e; - } - } - } - - private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { + static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { try { ds.sync(); @@ -959,7 +707,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { dsFuture.get(); } - private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception { + static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception { deltaStreamerTestRunner(ds, null, condition); } @@ -1461,7 +1209,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { TypedProperties parquetProps = new TypedProperties(); if (addCommonProps) { - populateCommonProps(parquetProps); + populateCommonProps(parquetProps, dfsBasePath); } parquetProps.setProperty("include", "base.properties"); @@ -1521,7 +1269,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException { // Properties used for testing delta-streamer with JsonKafka source TypedProperties props = new TypedProperties(); - populateAllCommonProps(props); + populateAllCommonProps(props, dfsBasePath, testUtils.brokerAddress()); props.setProperty("include", "base.properties"); props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java new file mode 100644 index 000000000..c93b7d998 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; + +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; + +import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; +import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER; +import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; +import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs; +import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner; + +@Tag("functional") +public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception { + // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + final String basePath = basePath().replaceAll("/$", ""); + final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + final String tableBasePath = basePath + "/testtable_" + tableType; + prepareInitialConfigs(fs(), basePath, "foo"); + // enable carrying forward latest checkpoint + TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); + props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000"); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + cfgIngestionJob.continuousMode = true; + cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + + // Prepare base dataset with some commits + deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }); + + // create a backfill job + HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + cfgBackfillJob.continuousMode = false; + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); + cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); + cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + + // re-init ingestion job to start sync service + HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + + // run ingestion & backfill in parallel, create conflict and fail one + runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, + cfgIngestionJob, backfillJob, cfgBackfillJob, true); + + // create new ingestion & backfill job config to generate only INSERTS to avoid conflict + props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); + props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty("hoodie.test.source.generate.inserts", "true"); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); + cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); + cfgBackfillJob.continuousMode = false; + meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + commitMetadata = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); + cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); + cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + + cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); + cfgIngestionJob.continuousMode = true; + cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + // re-init ingestion job + HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + // re-init backfill job + HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + + // run ingestion & backfill in parallel, avoid conflict and succeed both + runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, + cfgIngestionJob, backfillJob2, cfgBackfillJob, false); + } + + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) + void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { + // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + final String basePath = basePath().replaceAll("/$", ""); + final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + final String tableBasePath = basePath + "/testtable_" + tableType; + prepareInitialConfigs(fs(), basePath, "foo"); + // enable carrying forward latest checkpoint + TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); + props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + cfgIngestionJob.continuousMode = true; + cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + + // Prepare base dataset with some commits + deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }); + + // create a backfill job with checkpoint from the first instant + HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + cfgBackfillJob.continuousMode = false; + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); + + // get current checkpoint after preparing base dataset with some commits + HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); + String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); + + // run the backfill job, enable overriding checkpoint from the latest commit + props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); + props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + + // reset checkpoint to first instant to simulate a random checkpoint for backfill job + // checkpoint will move from 00000 to 00001 for this backfill job + cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY); + cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + backfillJob.sync(); + + // check if the checkpoint is carried over + timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); + commitMetadataForLastInstant = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class); + String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY); + Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill); + } + + private static TypedProperties prepareMultiWriterProps(FileSystem fs, String basePath, String propsFilePath) throws IOException { + TypedProperties props = new TypedProperties(); + HoodieDeltaStreamerTestBase.populateCommonProps(props, basePath); + HoodieDeltaStreamerTestBase.populateCommonHiveProps(props); + + props.setProperty("include", "sql-transformer.properties"); + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target.avsc"); + + props.setProperty("include", "base.properties"); + props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control"); + props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY"); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"); + props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1"); + props.setProperty("hoodie.write.lock.hivemetastore.table", "table1"); + props.setProperty("hoodie.write.lock.zookeeper.url", "127.0.0.1"); + props.setProperty("hoodie.write.lock.zookeeper.port", "2828"); + props.setProperty("hoodie.write.lock.wait_time_ms", "1200000"); + props.setProperty("hoodie.write.lock.num_retries", "10"); + props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table"); + props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test"); + + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); + return props; + } + + private static HoodieDeltaStreamer.Config getDeltaStreamerConfig(String basePath, + String tableType, WriteOperationType op, String propsFilePath, List transformerClassNames) { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + cfg.targetBasePath = basePath; + cfg.targetTableName = "hoodie_trips"; + cfg.tableType = tableType; + cfg.sourceClassName = TestDataSource.class.getName(); + cfg.transformerClassNames = transformerClassNames; + cfg.operation = op; + cfg.enableHiveSync = false; + cfg.sourceOrderingField = "timestamp"; + cfg.propsFilePath = propsFilePath; + cfg.sourceLimit = 1000; + cfg.schemaProviderClassName = defaultSchemaProviderClassName; + cfg.deltaSyncSchedulingWeight = 1; + cfg.deltaSyncSchedulingMinShare = 1; + cfg.compactSchedulingWeight = 2; + cfg.compactSchedulingMinShare = 1; + cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 10)); + cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 10)); + cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 10)); + return cfg; + } + + private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, + HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, + HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { + ExecutorService service = Executors.newFixedThreadPool(2); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp(); + // Condition for parallel ingestion job + Function conditionForRegularIngestion = (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }; + + try { + Future regularIngestionJobFuture = service.submit(() -> { + try { + deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + Future backfillJobFuture = service.submit(() -> { + try { + backfillJob.sync(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + backfillJobFuture.get(); + regularIngestionJobFuture.get(); + if (expectConflict) { + Assertions.fail("Failed to handle concurrent writes"); + } + } catch (Exception e) { + /* + * Need to perform getMessage().contains since the exception coming + * from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions. + */ + if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) { + // expected ConcurrentModificationException since ingestion & backfill will have overlapping writes + } else { + throw e; + } + } + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index c941e8792..8eb91d246 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBase { +public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBase { private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class); @@ -237,7 +237,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa private String populateCommonPropsAndWriteToFile() throws IOException { TypedProperties commonProps = new TypedProperties(); - populateCommonProps(commonProps); + populateCommonProps(commonProps, dfsBasePath); UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); return PROPS_FILENAME_TEST_PARQUET; }