diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index dc150803e..bcd7b3b7d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -51,6 +51,9 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP; +import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.TARGET_SCHEMA_REGISTRY_URL_PROP; + /** * Wrapper over HoodieDeltaStreamer.java class. * Helps with ingesting incremental data into hoodie datasets for multiple tables. @@ -152,19 +155,38 @@ public class HoodieMultiTableDeltaStreamer { private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) { + populateSourceRegistryProp(typedProperties); + populateTargetRegistryProp(typedProperties); + } + } + + private void populateTargetRegistryProp(TypedProperties typedProperties) { + String schemaRegistryTargetUrl = typedProperties.getString(TARGET_SCHEMA_REGISTRY_URL_PROP, null); + if (StringUtils.isNullOrEmpty(schemaRegistryTargetUrl)) { String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null); - String sourceSchemaRegistrySuffix; String targetSchemaRegistrySuffix; if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) { - sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX); targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX); } else { targetSchemaRegistrySuffix = schemaRegistrySuffix; + } + typedProperties.setProperty(TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix); + } + } + + private void populateSourceRegistryProp(TypedProperties typedProperties) { + String schemaRegistrySourceUrl = typedProperties.getString(SRC_SCHEMA_REGISTRY_URL_PROP, null); + if (StringUtils.isNullOrEmpty(schemaRegistrySourceUrl)) { + String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); + String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null); + String sourceSchemaRegistrySuffix; + if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) { + sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX); + } else { sourceSchemaRegistrySuffix = schemaRegistrySuffix; } - typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix); - typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix); + typedProperties.setProperty(SRC_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix); } } @@ -397,8 +419,6 @@ public class HoodieMultiTableDeltaStreamer { public static class Constants { public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic"; - private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; - private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table"; private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl"; private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix"; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 216369296..1046eac97 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -50,7 +50,7 @@ public class SchemaRegistryProvider extends SchemaProvider { public static class Config { public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; - private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = + public static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index 8eb91d246..da5c6cc66 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.TableExecutionContext; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaRegistryProvider; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; @@ -49,12 +50,13 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa static class TestHelpers { - static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) { - return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset"); + static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, + Class clazz) { + return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset", clazz); } static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, - boolean setSchemaProvider, String basePathPrefix) { + boolean setSchemaProvider, String basePathPrefix, Class clazz) { HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config(); config.configFolder = configFolder; config.targetTableName = "dummy_table"; @@ -64,7 +66,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa config.sourceClassName = sourceClassName; config.sourceOrderingField = "timestamp"; if (setSchemaProvider) { - config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + config.schemaProviderClassName = clazz != null ? clazz.getName() : FilebasedSchemaProvider.class.getName(); } config.enableHiveSync = enableHiveSync; config.enableMetaSync = enableMetaSync; @@ -74,7 +76,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa @Test public void testInvalidHiveSyncProps() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null); Exception e = assertThrows(HoodieException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when hive sync table not provided with enableHiveSync flag"); @@ -84,7 +86,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa @Test public void testInvalidPropsFilePath() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null); Exception e = assertThrows(IllegalArgumentException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when invalid props file is provided"); @@ -94,7 +96,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa @Test public void testInvalidTableConfigFilePath() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null); Exception e = assertThrows(IllegalArgumentException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when invalid table config props file path is provided"); @@ -104,7 +106,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa @Test public void testCustomConfigProps() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1); assertEquals(2, streamer.getTableExecutionContexts().size()); @@ -114,13 +116,16 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key())); assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key())); assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP)); + assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP)); + assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest", + streamer.getTableExecutionContexts().get(0).getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP)); } @Test @Disabled public void testInvalidIngestionProps() { Exception e = assertThrows(Exception.class, () -> { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null); new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Creation of execution object should fail without kafka topic"); log.debug("Creation of execution object failed with error: " + e.getMessage(), e); @@ -139,7 +144,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false, null); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List executionContexts = streamer.getTableExecutionContexts(); TypedProperties properties = executionContexts.get(1).getProperties(); @@ -189,7 +194,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa String parquetPropsFile = populateCommonPropsAndWriteToFile(); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false, - false, "multi_table_parquet"); + false, "multi_table_parquet", null); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List executionContexts = streamer.getTableExecutionContexts(); @@ -219,7 +224,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa @Test public void testTableLevelProperties() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, null); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List tableExecutionContexts = streamer.getTableExecutionContexts(); tableExecutionContexts.forEach(tableExecutionContext -> { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties index 243afc90f..75d74d6bc 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties @@ -22,4 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic2 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table -hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator \ No newline at end of file +hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator +hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/ +hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties index 3d3501fec..f5b079265 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties @@ -22,4 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic1 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.datasource.hive_sync.database=uber_hive_db -hoodie.datasource.hive_sync.table=uber_hive_dummy_table \ No newline at end of file +hoodie.datasource.hive_sync.table=uber_hive_dummy_table +hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest +hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest \ No newline at end of file