diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 9d5ca3ca8..be2fe5425 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; /** @@ -75,9 +76,9 @@ public class HoodieMultiTableDeltaStreamer { FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration()); configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder; checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs); - TypedProperties properties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig(); + TypedProperties commonProperties = UtilHelpers.readConfig(fs, new Path(commonPropsFile), new ArrayList<>()).getConfig(); //get the tables to be ingested and their corresponding config files from this properties instance - populateTableExecutionContextList(properties, configFolder, fs, config); + populateTableExecutionContextList(commonProperties, configFolder, fs, config); } private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, String configFolder, FileSystem fs) throws IOException { @@ -147,7 +148,7 @@ public class HoodieMultiTableDeltaStreamer { } private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { - if (cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) { + if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) { String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP); typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 7fb5b1862..7522c2d29 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -118,8 +118,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public static final String PROPS_INVALID_TABLE_CONFIG_FILE = "test-invalid-table-config.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; - private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + protected static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; + private static final String FIRST_PARQUET_FILE_NAME = "1.parquet"; private static String PARQUET_SOURCE_ROOT; private static String JSON_KAFKA_SOURCE_ROOT; private static final int PARQUET_NUM_RECORDS = 5; @@ -214,7 +215,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); TypedProperties props1 = new TypedProperties(); - populateCommonProps(props1); + populateAllCommonProps(props1); UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1); TypedProperties properties = new TypedProperties(); @@ -226,7 +227,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1); - prepareParquetDFSFiles(PARQUET_NUM_RECORDS); + prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); } private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { @@ -236,20 +237,30 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_uber_config.properties"); } - private static void populateCommonProps(TypedProperties props) { + private static void populateAllCommonProps(TypedProperties props) { + populateCommonProps(props); + populateCommonKafkaProps(props); + populateCommonHiveProps(props); + } + + protected static void populateCommonProps(TypedProperties props) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd"); props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", "short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber"); props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/uber_config.properties"); props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile", dfsBasePath + "/config/short_trip_uber_config.properties"); + } + protected static void populateCommonKafkaProps(TypedProperties props) { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); + } + protected static void populateCommonHiveProps(TypedProperties props) { // Hive Configs props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb2"); @@ -975,12 +986,16 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } private static void prepareParquetDFSFiles(int numRecords) throws IOException { - prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null); + prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT); } - private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema, + protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath) throws IOException { + prepareParquetDFSFiles(numRecords, baseParquetPath, FIRST_PARQUET_FILE_NAME, false, null, null); + } + + protected static void prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, String schemaStr, Schema schema) throws IOException { - String path = PARQUET_SOURCE_ROOT + "/" + fileName; + String path = baseParquetPath + "/" + fileName; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); if (useCustomSchema) { Helpers.saveParquetToDFS(Helpers.toGenericRecords( @@ -1006,13 +1021,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", - PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT); + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, - String propsFileName, String parquetSourceRoot) throws IOException { + String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); + + if (addCommonProps) { + populateCommonProps(parquetProps); + } + parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server","false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); @@ -1042,7 +1062,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException { // Properties used for testing delta-streamer with JsonKafka source TypedProperties props = new TypedProperties(); - populateCommonProps(props); + populateAllCommonProps(props); props.setProperty("include", "base.properties"); props.setProperty("hoodie.embed.timeline.server","false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); @@ -1065,10 +1085,10 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // prep parquet source PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum; int parquetRecords = 10; - prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT); + PARQUET_SOURCE_ROOT, false); // delta streamer w/ parquest source String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index ad1b75395..7b5ce9d74 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -26,7 +26,9 @@ import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.TableExecutionContext; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.JsonKafkaSource; +import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -34,7 +36,9 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -43,19 +47,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { private static volatile Logger log = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class); + private static final Random RANDOM = new Random(); static class TestHelpers { static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) { + return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, true, "multi_table_dataset"); + } + + static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, + boolean setSchemaProvider, String basePathPrefix) { HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config(); config.configFolder = configFolder; config.targetTableName = "dummy_table"; - config.basePathPrefix = dfsBasePath + "/multi_table_dataset"; + config.basePathPrefix = dfsBasePath + "/" + basePathPrefix; config.propsFilePath = dfsBasePath + "/" + fileName; config.tableType = "COPY_ON_WRITE"; config.sourceClassName = sourceClassName; config.sourceOrderingField = "timestamp"; - config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + if (setSchemaProvider) { + config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + } config.enableHiveSync = enableHiveSync; return config; } @@ -117,7 +129,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { } @Test //0 corresponds to fg - public void testMultiTableExecution() throws IOException { + public void testMultiTableExecutionWithKafkaSource() throws IOException { //create topics for each table String topicName1 = "topic" + testNum++; String topicName2 = "topic" + testNum; @@ -128,7 +140,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List executionContexts = streamer.getTableExecutionContexts(); TypedProperties properties = executionContexts.get(1).getProperties(); @@ -160,4 +172,79 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer { TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); testNum++; } + + @Test + public void testMultiTableExecutionWithParquetSource() throws IOException { + // ingest test data to 2 parquet source paths + String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/"; + prepareParquetDFSFiles(10, parquetSourceRoot1); + String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/"; + prepareParquetDFSFiles(5, parquetSourceRoot2); + + // add only common props. later we can add per table props + String parquetPropsFile = populateCommonPropsAndWriteToFile(); + + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, + false, "multi_table_parquet"); + HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); + + List executionContexts = streamer.getTableExecutionContexts(); + // fetch per parquet source props and add per table properties + ingestPerParquetSourceProps(executionContexts, Arrays.asList(new String[] {parquetSourceRoot1, parquetSourceRoot2})); + + String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath; + String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; + + // sync and verify + syncAndVerify(streamer, targetBasePath1, targetBasePath2, 10, 5); + + int totalTable1Records = 10; + int totalTable2Records = 5; + // ingest multiple rounds and verify + for (int i = 0; i < 3; i++) { + int table1Records = 10 + RANDOM.nextInt(100); + int table2Records = 15 + RANDOM.nextInt(100); + prepareParquetDFSFiles(table1Records, parquetSourceRoot1, (i + 2) + ".parquet", false, null, null); + prepareParquetDFSFiles(table2Records, parquetSourceRoot2, (i + 2) + ".parquet", false, null, null); + totalTable1Records += table1Records; + totalTable2Records += table2Records; + // sync and verify + syncAndVerify(streamer, targetBasePath1, targetBasePath2, totalTable1Records, totalTable2Records); + } + } + + private String populateCommonPropsAndWriteToFile() throws IOException { + TypedProperties commonProps = new TypedProperties(); + populateCommonProps(commonProps); + UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); + return PROPS_FILENAME_TEST_PARQUET; + } + + private TypedProperties getParquetProps(String parquetSourceRoot) { + TypedProperties props = new TypedProperties(); + props.setProperty("include", "base.properties"); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + return props; + } + + private void ingestPerParquetSourceProps(List executionContexts, List parquetSourceRoots) { + int counter = 0; + for (String parquetSourceRoot : parquetSourceRoots) { + TypedProperties properties = executionContexts.get(counter).getProperties(); + TypedProperties parquetProps = getParquetProps(parquetSourceRoot); + parquetProps.forEach((k, v) -> { + properties.setProperty(k.toString(), v.toString()); + }); + executionContexts.get(counter).setProperties(properties); + counter++; + } + } + + private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long table2ExpectedRecords) { + streamer.sync(); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, targetBasePath1 + "/*/*.parquet", sqlContext); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, targetBasePath2 + "/*/*.parquet", sqlContext); + } }