From 7bdae69053afc5ef604a15806d78317cb976f2ce Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 30 Jul 2021 01:22:26 -0400 Subject: [PATCH] [HUDI-2253] Refactoring few tests to reduce runningtime. DeltaStreamer and MultiDeltaStreamer tests. Bulk insert row writer tests (#3371) Co-authored-by: Sivabalan Narayanan --- ...estHoodieBulkInsertDataInternalWriter.java | 4 +- .../TestHoodieDataSourceInternalWriter.java | 9 +- ...estHoodieBulkInsertDataInternalWriter.java | 4 +- ...estHoodieDataSourceInternalBatchWrite.java | 7 +- .../functional/TestHoodieDeltaStreamer.java | 226 +--------------- .../TestHoodieDeltaStreamerBase.java | 245 ++++++++++++++++++ .../TestHoodieMultiTableDeltaStreamer.java | 4 +- 7 files changed, 266 insertions(+), 233 deletions(-) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index 9735379d4..fd943b72e 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -74,7 +74,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); // execute N rounds - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 2; i++) { String instantTime = "00" + i; // init writer HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), @@ -82,7 +82,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends int size = 10 + RANDOM.nextInt(1000); // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file - int batches = 5; + int batches = 3; Dataset totalInputRows = null; for (int j = 0; j < batches; j++) { diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java index 342e2ae90..eea49e667 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java @@ -30,6 +30,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -87,7 +88,7 @@ public class TestHoodieDataSourceInternalWriter extends } int size = 10 + RANDOM.nextInt(1000); - int batches = 5; + int batches = 2; Dataset totalInputRows = null; for (int j = 0; j < batches; j++) { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; @@ -158,7 +159,7 @@ public class TestHoodieDataSourceInternalWriter extends int partitionCounter = 0; // execute N rounds - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 2; i++) { String instantTime = "00" + i; // init writer HoodieDataSourceInternalWriter dataSourceInternalWriter = @@ -168,7 +169,7 @@ public class TestHoodieDataSourceInternalWriter extends DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong()); int size = 10 + RANDOM.nextInt(1000); - int batches = 5; // one batch per partition + int batches = 2; // one batch per partition for (int j = 0; j < batches; j++) { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; @@ -195,6 +196,8 @@ public class TestHoodieDataSourceInternalWriter extends } } + // takes up lot of running time with CI. + @Disabled @ParameterizedTest @MethodSource("bulkInsertTypeParams") public void testLargeWrites(boolean populateMetaFields) throws Exception { diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java index 3b7fb97ff..a3d0e3237 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -75,7 +75,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends HoodieWriteConfig cfg = getWriteConfig(populateMetaFields); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); // execute N rounds - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 2; i++) { String instantTime = "00" + i; // init writer HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), @@ -83,7 +83,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends int size = 10 + RANDOM.nextInt(1000); // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file - int batches = 5; + int batches = 3; Dataset totalInputRows = null; for (int j = 0; j < batches; j++) { diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java index 3c3866ee5..ae4980461 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java @@ -32,6 +32,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.write.DataWriter; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -161,7 +162,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends int partitionCounter = 0; // execute N rounds - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 2; i++) { String instantTime = "00" + i; // init writer HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite = @@ -171,7 +172,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends DataWriter writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong()); int size = 10 + RANDOM.nextInt(1000); - int batches = 5; // one batch per partition + int batches = 3; // one batch per partition for (int j = 0; j < batches; j++) { String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; @@ -197,6 +198,8 @@ public class TestHoodieDataSourceInternalBatchWrite extends } } + // Large writes are not required to be executed w/ regular CI jobs. Takes lot of running time. + @Disabled @ParameterizedTest @MethodSource("bulkInsertTypeParams") public void testLargeWrites(boolean populateMetaFields) throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 80f471e31..1c68476f7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,11 +18,6 @@ package org.apache.hudi.utilities.functional; -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ConcurrentModificationException; -import java.util.concurrent.ExecutorService; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; @@ -45,7 +40,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; -import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; @@ -65,7 +59,6 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -84,24 +77,22 @@ import org.apache.spark.sql.api.java.UDF4; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; -import org.apache.spark.streaming.kafka010.KafkaTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Properties; -import java.util.Random; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -119,176 +110,9 @@ import static org.junit.jupiter.api.Assertions.fail; /** * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. */ -public class TestHoodieDeltaStreamer extends UtilitiesTestBase { +public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { - private static final Random RANDOM = new Random(); - private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; - public static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties"; - public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties"; - public static final String PROPS_INVALID_FILE = "test-invalid-props.properties"; - public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties"; - private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; - private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; - protected static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; - private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; - private static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties"; - private static final String FIRST_PARQUET_FILE_NAME = "1.parquet"; - private static String PARQUET_SOURCE_ROOT; - private static String JSON_KAFKA_SOURCE_ROOT; - private static final int PARQUET_NUM_RECORDS = 5; - private static final int CSV_NUM_RECORDS = 3; - private static final int JSON_KAFKA_NUM_RECORDS = 5; - private String kafkaCheckpointType = "string"; - // Required fields - private static final String TGT_BASE_PATH_PARAM = "--target-base-path"; - private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; - private static final String TABLE_TYPE_PARAM = "--table-type"; - private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE"; - private static final String TARGET_TABLE_PARAM = "--target-table"; - private static final String TARGET_TABLE_VALUE = "test"; - private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format"; - private static final String BASE_FILE_FORMAT_VALUE = "PARQUET"; - private static final String SOURCE_LIMIT_PARAM = "--source-limit"; - private static final String SOURCE_LIMIT_VALUE = "500"; - private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync"; - private static final String HOODIE_CONF_PARAM = "--hoodie-conf"; - private static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; - private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); - public static KafkaTestUtils testUtils; - protected static String topicName; - - protected static int testNum = 1; - - @BeforeAll - public static void initClass() throws Exception { - UtilitiesTestBase.initClass(true); - PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; - JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; - testUtils = new KafkaTestUtils(); - testUtils.setup(); - topicName = "topic" + testNum; - - // prepare the configs. - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, - dfsBasePath + "/sql-transformer.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); - - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", dfs, dfsBasePath + "/source_short_trip_uber.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", dfs, dfsBasePath + "/target_short_trip_uber.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", dfs, dfsBasePath + "/target_uber.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); - - TypedProperties props = new TypedProperties(); - props.setProperty("include", "sql-transformer.properties"); - props.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName()); - props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); - - // Hive Configs - props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/"); - props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb1"); - props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), "hive_trips"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(), - MultiPartKeysValueExtractor.class.getName()); - UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); - - // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to - // downstream hudi table - TypedProperties downstreamProps = new TypedProperties(); - downstreamProps.setProperty("include", "base.properties"); - downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - - // Source schema is the target schema of upstream table - downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); - downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); - UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties"); - - // Properties used for testing invalid key generator - TypedProperties invalidProps = new TypedProperties(); - invalidProps.setProperty("include", "sql-transformer.properties"); - invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid"); - invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); - invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); - UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); - - TypedProperties props1 = new TypedProperties(); - populateAllCommonProps(props1); - UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1); - - TypedProperties properties = new TypedProperties(); - populateInvalidTableConfigFilePathProps(properties); - UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE); - - TypedProperties invalidHiveSyncProps = new TypedProperties(); - invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); - invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); - UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); - - prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); - } - - @AfterAll - public static void release() { - if (testUtils != null) { - testUtils.teardown(); - } - } - - private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { - props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); - props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); - props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); - props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); - } - - private static void populateAllCommonProps(TypedProperties props) { - populateCommonProps(props); - populateCommonKafkaProps(props); - populateCommonHiveProps(props); - } - - protected static void populateCommonProps(TypedProperties props) { - props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); - props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); - props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); - props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); - props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); - } - - protected static void populateCommonKafkaProps(TypedProperties props) { - //Kafka source properties - props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", "earliest"); - props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); - } - - protected static void populateCommonHiveProps(TypedProperties props) { - // Hive Configs - props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/"); - props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb2"); - props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(), "false"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr"); - props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(), - MultiPartKeysValueExtractor.class.getName()); - } protected static TypedProperties prepareMultiWriterProps(String propsFileName) throws IOException { TypedProperties props = new TypedProperties(); @@ -318,24 +142,6 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { return props; } - @AfterAll - public static void cleanupClass() { - UtilitiesTestBase.cleanupClass(); - if (testUtils != null) { - testUtils.teardown(); - } - } - - @BeforeEach - public void setup() throws Exception { - super.setup(); - } - - @AfterEach - public void teardown() throws Exception { - super.teardown(); - } - static class TestHelpers { static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { @@ -1333,28 +1139,6 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertEquals(1000, c); } - private static void prepareParquetDFSFiles(int numRecords) throws IOException { - prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT); - } - - protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException { - prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null); - } - - protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, - String schemaStr, Schema schema) throws IOException { - String path = baseParquetPath + "/" + fileName; - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - if (useCustomSchema) { - Helpers.saveParquetToDFS(Helpers.toGenericRecords( - dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), - schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); - } else { - Helpers.saveParquetToDFS(Helpers.toGenericRecords( - dataGenerator.generateInserts("000", numRecords)), new Path(path)); - } - } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException { if (createTopic) { try { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java new file mode 100644 index 000000000..ae477dc9a --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.streaming.kafka010.KafkaTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.util.Random; + +public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase { + + + static final Random RANDOM = new Random(); + static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; + static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties"; + static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = "test-invalid-hive-sync-source1.properties"; + static final String PROPS_INVALID_FILE = "test-invalid-props.properties"; + static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties"; + static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; + static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; + static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; + static final String PROPS_FILENAME_TEST_MULTI_WRITER = "test-multi-writer.properties"; + static final String FIRST_PARQUET_FILE_NAME = "1.parquet"; + static String PARQUET_SOURCE_ROOT; + static String JSON_KAFKA_SOURCE_ROOT; + static final int PARQUET_NUM_RECORDS = 5; + static final int CSV_NUM_RECORDS = 3; + static final int JSON_KAFKA_NUM_RECORDS = 5; + String kafkaCheckpointType = "string"; + // Required fields + static final String TGT_BASE_PATH_PARAM = "--target-base-path"; + static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; + static final String TABLE_TYPE_PARAM = "--table-type"; + static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE"; + static final String TARGET_TABLE_PARAM = "--target-table"; + static final String TARGET_TABLE_VALUE = "test"; + static final String BASE_FILE_FORMAT_PARAM = "--base-file-format"; + static final String BASE_FILE_FORMAT_VALUE = "PARQUET"; + static final String SOURCE_LIMIT_PARAM = "--source-limit"; + static final String SOURCE_LIMIT_VALUE = "500"; + static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync"; + static final String HOODIE_CONF_PARAM = "--hoodie-conf"; + static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; + static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; + static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerBase.class); + public static KafkaTestUtils testUtils; + protected static String topicName; + + protected static int testNum = 1; + + @BeforeAll + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(true); + PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; + testUtils = new KafkaTestUtils(); + testUtils.setup(); + topicName = "topic" + testNum; + + // prepare the configs. + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/config/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, + dfsBasePath + "/sql-transformer.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); + + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc", dfs, dfsBasePath + "/source_short_trip_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc", dfs, dfsBasePath + "/target_short_trip_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", dfs, dfsBasePath + "/target_uber.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties", dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties"); + + TypedProperties props = new TypedProperties(); + props.setProperty("include", "sql-transformer.properties"); + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + + // Hive Configs + props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/"); + props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb1"); + props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), "hive_trips"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(), + MultiPartKeysValueExtractor.class.getName()); + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE); + + // Properties used for the delta-streamer which incrementally pulls from upstream Hudi source table and writes to + // downstream hudi table + TypedProperties downstreamProps = new TypedProperties(); + downstreamProps.setProperty("include", "base.properties"); + downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + + // Source schema is the target schema of upstream table + downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); + downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath + "/test-downstream-source.properties"); + + // Properties used for testing invalid key generator + TypedProperties invalidProps = new TypedProperties(); + invalidProps.setProperty("include", "sql-transformer.properties"); + invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", "invalid"); + invalidProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); + + TypedProperties props1 = new TypedProperties(); + populateAllCommonProps(props1); + UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1); + + TypedProperties properties = new TypedProperties(); + populateInvalidTableConfigFilePathProps(properties); + UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + "/" + PROPS_INVALID_TABLE_CONFIG_FILE); + + TypedProperties invalidHiveSyncProps = new TypedProperties(); + invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); + UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); + + prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + if (testUtils != null) { + testUtils.teardown(); + } + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "uber_db.dummy_table_uber"); + props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); + } + + static void populateAllCommonProps(TypedProperties props) { + populateCommonProps(props); + populateCommonKafkaProps(props); + populateCommonHiveProps(props); + } + + protected static void populateCommonProps(TypedProperties props) { + props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); + props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); + props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); + props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); + } + + protected static void populateCommonKafkaProps(TypedProperties props) { + //Kafka source properties + props.setProperty("bootstrap.servers", testUtils.brokerAddress()); + props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); + } + + protected static void populateCommonHiveProps(TypedProperties props) { + // Hive Configs + props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), "jdbc:hive2://127.0.0.1:9999/"); + props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), "testdb2"); + props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(), "false"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), "datestr"); + props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(), + MultiPartKeysValueExtractor.class.getName()); + } + + protected static void prepareParquetDFSFiles(int numRecords) throws IOException { + prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT); + } + + protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException { + prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null); + } + + protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, + String schemaStr, Schema schema) throws IOException { + String path = baseParquetPath + "/" + fileName; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + if (useCustomSchema) { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), + schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + } else { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInserts("000", numRecords)), new Path(path)); + } + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index eed7c4c79..f264ec662 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -38,16 +38,14 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { +public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBase { private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class); - private static final Random RANDOM = new Random(); static class TestHelpers {