1
0

[HUDI-3264]: made schema registry urls configurable with MTDS (#4779)

This commit is contained in:
Pratyaksh Sharma
2022-03-03 02:00:41 +05:30
committed by GitHub
parent 527bd34b1c
commit 907e60c252
5 changed files with 50 additions and 21 deletions

View File

@@ -51,6 +51,9 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP;
import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.TARGET_SCHEMA_REGISTRY_URL_PROP;
/** /**
* Wrapper over HoodieDeltaStreamer.java class. * Wrapper over HoodieDeltaStreamer.java class.
* Helps with ingesting incremental data into hoodie datasets for multiple tables. * Helps with ingesting incremental data into hoodie datasets for multiple tables.
@@ -152,19 +155,38 @@ public class HoodieMultiTableDeltaStreamer {
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) { private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) { if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) {
populateSourceRegistryProp(typedProperties);
populateTargetRegistryProp(typedProperties);
}
}
private void populateTargetRegistryProp(TypedProperties typedProperties) {
String schemaRegistryTargetUrl = typedProperties.getString(TARGET_SCHEMA_REGISTRY_URL_PROP, null);
if (StringUtils.isNullOrEmpty(schemaRegistryTargetUrl)) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP); String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null); String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
String sourceSchemaRegistrySuffix;
String targetSchemaRegistrySuffix; String targetSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) { if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX); targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
} else { } else {
targetSchemaRegistrySuffix = schemaRegistrySuffix; targetSchemaRegistrySuffix = schemaRegistrySuffix;
}
typedProperties.setProperty(TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
}
}
private void populateSourceRegistryProp(TypedProperties typedProperties) {
String schemaRegistrySourceUrl = typedProperties.getString(SRC_SCHEMA_REGISTRY_URL_PROP, null);
if (StringUtils.isNullOrEmpty(schemaRegistrySourceUrl)) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
String sourceSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
} else {
sourceSchemaRegistrySuffix = schemaRegistrySuffix; sourceSchemaRegistrySuffix = schemaRegistrySuffix;
} }
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix); typedProperties.setProperty(SRC_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
} }
} }
@@ -397,8 +419,6 @@ public class HoodieMultiTableDeltaStreamer {
public static class Constants { public static class Constants {
public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic"; public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table"; public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl"; private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix"; private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";

View File

@@ -50,7 +50,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static class Config { public static class Config {
public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.targetUrl"; "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
} }

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext; import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
@@ -49,12 +50,13 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
static class TestHelpers { static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) { static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset"); Class<?> clazz) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset", clazz);
} }
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
boolean setSchemaProvider, String basePathPrefix) { boolean setSchemaProvider, String basePathPrefix, Class<?> clazz) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config(); HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder; config.configFolder = configFolder;
config.targetTableName = "dummy_table"; config.targetTableName = "dummy_table";
@@ -64,7 +66,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
config.sourceClassName = sourceClassName; config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp"; config.sourceOrderingField = "timestamp";
if (setSchemaProvider) { if (setSchemaProvider) {
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); config.schemaProviderClassName = clazz != null ? clazz.getName() : FilebasedSchemaProvider.class.getName();
} }
config.enableHiveSync = enableHiveSync; config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync; config.enableMetaSync = enableMetaSync;
@@ -74,7 +76,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
@Test @Test
public void testInvalidHiveSyncProps() throws IOException { public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(HoodieException.class, () -> { Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc); new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync flag"); }, "Should fail when hive sync table not provided with enableHiveSync flag");
@@ -84,7 +86,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
@Test @Test
public void testInvalidPropsFilePath() throws IOException { public void testInvalidPropsFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> { Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc); new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided"); }, "Should fail when invalid props file is provided");
@@ -94,7 +96,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
@Test @Test
public void testInvalidTableConfigFilePath() throws IOException { public void testInvalidTableConfigFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> { Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc); new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided"); }, "Should fail when invalid table config props file path is provided");
@@ -104,7 +106,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
@Test @Test
public void testCustomConfigProps() throws IOException { public void testCustomConfigProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1); TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals(2, streamer.getTableExecutionContexts().size()); assertEquals(2, streamer.getTableExecutionContexts().size());
@@ -114,13 +116,16 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key())); assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key())); assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP)); assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest",
streamer.getTableExecutionContexts().get(0).getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
} }
@Test @Test
@Disabled @Disabled
public void testInvalidIngestionProps() { public void testInvalidIngestionProps() {
Exception e = assertThrows(Exception.class, () -> { Exception e = assertThrows(Exception.class, () -> {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
new HoodieMultiTableDeltaStreamer(cfg, jsc); new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Creation of execution object should fail without kafka topic"); }, "Creation of execution object should fail without kafka topic");
log.debug("Creation of execution object failed with error: " + e.getMessage(), e); log.debug("Creation of execution object failed with error: " + e.getMessage(), e);
@@ -139,7 +144,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts(); List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties(); TypedProperties properties = executionContexts.get(1).getProperties();
@@ -189,7 +194,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
String parquetPropsFile = populateCommonPropsAndWriteToFile(); String parquetPropsFile = populateCommonPropsAndWriteToFile();
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false, HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false,
false, "multi_table_parquet"); false, "multi_table_parquet", null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts(); List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
@@ -219,7 +224,7 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
@Test @Test
public void testTableLevelProperties() throws IOException { public void testTableLevelProperties() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts(); List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts();
tableExecutionContexts.forEach(tableExecutionContext -> { tableExecutionContexts.forEach(tableExecutionContext -> {

View File

@@ -23,3 +23,5 @@ hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest

View File

@@ -23,3 +23,5 @@ hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.database=uber_hive_db hoodie.datasource.hive_sync.database=uber_hive_db
hoodie.datasource.hive_sync.table=uber_hive_dummy_table hoodie.datasource.hive_sync.table=uber_hive_dummy_table
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest