1
0

[HUDI-2397] Add --enable-sync parameter (#3608)

* add meta-sync config

* update test

* keep enableMetaSync same with enableHiveSync

* Switch check logic to use `enableMetaSync`
This commit is contained in:
K.I. (Dennis) Jung
2021-09-13 15:34:49 +09:00
committed by GitHub
parent 280f66e0f8
commit c79017cb74
2 changed files with 23 additions and 13 deletions

View File

@@ -128,8 +128,8 @@ public class HoodieMultiTableDeltaStreamer {
Helpers.deepCopyConfigs(config, cfg);
String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
throw new HoodieException("Hive sync table field not provided!");
if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
throw new HoodieException("Meta sync table field not provided!");
}
populateSchemaProviderProps(cfg, tableProperties);
executionContext = new TableExecutionContext();
@@ -180,6 +180,7 @@ public class HoodieMultiTableDeltaStreamer {
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
@@ -207,6 +208,11 @@ public class HoodieMultiTableDeltaStreamer {
public static void main(String[] args) throws IOException {
final Config config = new Config();
if (config.enableHiveSync) {
logger.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
}
JCommander cmd = new JCommander(config, null, args);
if (config.help || args.length == 0) {
cmd.usage();
@@ -292,6 +298,9 @@ public class HoodieMultiTableDeltaStreamer {
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
public Boolean enableHiveSync = false;
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;
@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")

View File

@@ -49,11 +49,11 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
static class TestHelpers {
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, true, "multi_table_dataset");
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset");
}
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync,
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
boolean setSchemaProvider, String basePathPrefix) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
@@ -67,13 +67,14 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync;
return config;
}
}
@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync flag");
@@ -83,7 +84,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
@Test
public void testInvalidPropsFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided");
@@ -93,7 +94,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
@Test
public void testInvalidTableConfigFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided");
@@ -103,7 +104,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
@Test
public void testCustomConfigProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals(2, streamer.getTableExecutionContexts().size());
@@ -119,7 +120,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
@Disabled
public void testInvalidIngestionProps() {
Exception e = assertThrows(Exception.class, () -> {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Creation of execution object should fail without kafka topic");
log.debug("Creation of execution object failed with error: " + e.getMessage(), e);
@@ -138,7 +139,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
@@ -187,7 +188,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
// add only common props. later we can add per table props
String parquetPropsFile = populateCommonPropsAndWriteToFile();
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false,
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false,
false, "multi_table_parquet");
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
@@ -218,7 +219,7 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa
@Test
public void testTableLevelProperties() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts();
tableExecutionContexts.forEach(tableExecutionContext -> {