From ea14d687da0e20a4673efa1326e5332978305269 Mon Sep 17 00:00:00 2001 From: Nick Young <72905543+NickYoungPeng@users.noreply.github.com> Date: Sat, 1 May 2021 10:02:00 +0800 Subject: [PATCH] [HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884) --- .../HoodieMultiTableDeltaStreamer.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 8e557f1b9..513331dc6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -153,9 +153,18 @@ public class HoodieMultiTableDeltaStreamer { private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) { String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); - String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP); - typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); - typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); + String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null); + String sourceSchemaRegistrySuffix; + String targetSchemaRegistrySuffix; + if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) { + sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX); + targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX); + } else { + targetSchemaRegistrySuffix = schemaRegistrySuffix; + sourceSchemaRegistrySuffix = schemaRegistrySuffix; + } + typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix); + typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix); } } @@ -378,6 +387,8 @@ public class HoodieMultiTableDeltaStreamer { public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table"; private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl"; private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix"; + private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix"; + private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix"; private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested"; private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion."; private static final String INGESTION_CONFIG_SUFFIX = ".configFile";