1
0

[HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884)

This commit is contained in:
Nick Young
2021-05-01 10:02:00 +08:00
committed by GitHub
parent 3418a92de8
commit ea14d687da

View File

@@ -153,9 +153,18 @@ public class HoodieMultiTableDeltaStreamer {
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) { if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP); String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); String sourceSchemaRegistrySuffix;
typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + schemaRegistrySuffix); String targetSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
} else {
targetSchemaRegistrySuffix = schemaRegistrySuffix;
sourceSchemaRegistrySuffix = schemaRegistrySuffix;
}
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
} }
} }
@@ -378,6 +387,8 @@ public class HoodieMultiTableDeltaStreamer {
public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table"; public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl"; private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix"; private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested"; private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion."; private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
private static final String INGESTION_CONFIG_SUFFIX = ".configFile"; private static final String INGESTION_CONFIG_SUFFIX = ".configFile";