[HUDI-1742] Improve table level config priority for HoodieMultiTableDeltaStreamer (#2744)
This commit is contained in:
@@ -118,7 +118,9 @@ public class HoodieMultiTableDeltaStreamer {
|
||||
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
|
||||
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new Path(configFilePath), new ArrayList<>()).getConfig();
|
||||
properties.forEach((k, v) -> {
|
||||
tableProperties.setProperty(k.toString(), v.toString());
|
||||
if (tableProperties.get(k) == null) {
|
||||
tableProperties.setProperty(k.toString(), v.toString());
|
||||
}
|
||||
});
|
||||
final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||
//copy all the values from config to cfg
|
||||
|
||||
@@ -1631,6 +1631,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTableLevelGenerator extends SimpleKeyGenerator {
|
||||
|
||||
public TestTableLevelGenerator(TypedProperties props) {
|
||||
super(props);
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
|
||||
|
||||
public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
|
||||
|
||||
@@ -213,6 +213,24 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableLevelProperties() throws IOException {
|
||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
|
||||
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||
List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts();
|
||||
tableExecutionContexts.forEach(tableExecutionContext -> {
|
||||
switch (tableExecutionContext.getTableName()) {
|
||||
case "dummy_table_short_trip":
|
||||
String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
|
||||
assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), tableLevelKeyGeneratorClass);
|
||||
break;
|
||||
default:
|
||||
String defaultKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
|
||||
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), defaultKeyGeneratorClass);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private String populateCommonPropsAndWriteToFile() throws IOException {
|
||||
TypedProperties commonProps = new TypedProperties();
|
||||
populateCommonProps(commonProps);
|
||||
|
||||
@@ -21,4 +21,5 @@ hoodie.datasource.write.partitionpath.field=created_at
|
||||
hoodie.deltastreamer.source.kafka.topic=topic2
|
||||
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
|
||||
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
|
||||
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
|
||||
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
|
||||
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
|
||||
Reference in New Issue
Block a user