[HUDI-4107] Added --sync-tool-classes config option in HoodieMultiTableDeltaStreamer (#5597)
* added --sync-tool-classes config option in multitable delta streamer * added a testcase to assert if syncClientToolClassNames is getting picked to the deltastreamer execution context
This commit is contained in:
committed by
GitHub
parent
918c4f4e0b
commit
795a99ba73
@@ -691,7 +691,7 @@ public class DeltaSync implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
|
private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
|
||||||
Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClass.split(",")));
|
Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
|
||||||
// for backward compatibility
|
// for backward compatibility
|
||||||
if (cfg.enableHiveSync) {
|
if (cfg.enableHiveSync) {
|
||||||
cfg.enableMetaSync = true;
|
cfg.enableMetaSync = true;
|
||||||
|
|||||||
@@ -306,7 +306,7 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
public Boolean enableMetaSync = false;
|
public Boolean enableMetaSync = false;
|
||||||
|
|
||||||
@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
|
@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
|
||||||
public String syncClientToolClass = HiveSyncTool.class.getName();
|
public String syncClientToolClassNames = HiveSyncTool.class.getName();
|
||||||
|
|
||||||
@Parameter(names = {"--max-pending-compactions"},
|
@Parameter(names = {"--max-pending-compactions"},
|
||||||
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
|
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
|
||||||
@@ -442,6 +442,8 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
&& operation == config.operation
|
&& operation == config.operation
|
||||||
&& Objects.equals(filterDupes, config.filterDupes)
|
&& Objects.equals(filterDupes, config.filterDupes)
|
||||||
&& Objects.equals(enableHiveSync, config.enableHiveSync)
|
&& Objects.equals(enableHiveSync, config.enableHiveSync)
|
||||||
|
&& Objects.equals(enableMetaSync, config.enableMetaSync)
|
||||||
|
&& Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
|
||||||
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
|
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
|
||||||
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
|
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
|
||||||
&& Objects.equals(continuousMode, config.continuousMode)
|
&& Objects.equals(continuousMode, config.continuousMode)
|
||||||
@@ -466,8 +468,8 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
baseFileFormat, propsFilePath, configs, sourceClassName,
|
baseFileFormat, propsFilePath, configs, sourceClassName,
|
||||||
sourceOrderingField, payloadClassName, schemaProviderClassName,
|
sourceOrderingField, payloadClassName, schemaProviderClassName,
|
||||||
transformerClassNames, sourceLimit, operation, filterDupes,
|
transformerClassNames, sourceLimit, operation, filterDupes,
|
||||||
enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
|
enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
|
||||||
minSyncIntervalSeconds, sparkMaster, commitOnErrors,
|
continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
|
||||||
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
|
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
|
||||||
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
|
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
|
||||||
initialCheckpointProvider, help);
|
initialCheckpointProvider, help);
|
||||||
@@ -491,6 +493,8 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
+ ", operation=" + operation
|
+ ", operation=" + operation
|
||||||
+ ", filterDupes=" + filterDupes
|
+ ", filterDupes=" + filterDupes
|
||||||
+ ", enableHiveSync=" + enableHiveSync
|
+ ", enableHiveSync=" + enableHiveSync
|
||||||
|
+ ", enableMetaSync=" + enableMetaSync
|
||||||
|
+ ", syncClientToolClassNames=" + syncClientToolClassNames
|
||||||
+ ", maxPendingCompactions=" + maxPendingCompactions
|
+ ", maxPendingCompactions=" + maxPendingCompactions
|
||||||
+ ", maxPendingClustering=" + maxPendingClustering
|
+ ", maxPendingClustering=" + maxPendingClustering
|
||||||
+ ", continuousMode=" + continuousMode
|
+ ", continuousMode=" + continuousMode
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.hive.HiveSyncTool;
|
||||||
import org.apache.hudi.sync.common.HoodieSyncConfig;
|
import org.apache.hudi.sync.common.HoodieSyncConfig;
|
||||||
import org.apache.hudi.utilities.IdentitySplitter;
|
import org.apache.hudi.utilities.IdentitySplitter;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
@@ -203,6 +204,7 @@ public class HoodieMultiTableDeltaStreamer {
|
|||||||
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
|
static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
|
||||||
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
|
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
|
||||||
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
|
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
|
||||||
|
tableConfig.syncClientToolClassNames = globalConfig.syncClientToolClassNames;
|
||||||
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
|
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
|
||||||
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
|
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
|
||||||
tableConfig.sourceClassName = globalConfig.sourceClassName;
|
tableConfig.sourceClassName = globalConfig.sourceClassName;
|
||||||
@@ -325,6 +327,9 @@ public class HoodieMultiTableDeltaStreamer {
|
|||||||
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
|
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
|
||||||
public Boolean enableMetaSync = false;
|
public Boolean enableMetaSync = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
|
||||||
|
public String syncClientToolClassNames = HiveSyncTool.class.getName();
|
||||||
|
|
||||||
@Parameter(names = {"--max-pending-compactions"},
|
@Parameter(names = {"--max-pending-compactions"},
|
||||||
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
|
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
|
||||||
+ "outstanding compactions is less than this number")
|
+ "outstanding compactions is less than this number")
|
||||||
|
|||||||
@@ -72,10 +72,19 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
|
|||||||
}
|
}
|
||||||
config.enableHiveSync = enableHiveSync;
|
config.enableHiveSync = enableHiveSync;
|
||||||
config.enableMetaSync = enableMetaSync;
|
config.enableMetaSync = enableMetaSync;
|
||||||
|
config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2";
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetaSyncConfig() throws IOException {
|
||||||
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
|
||||||
|
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
|
||||||
|
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
|
||||||
|
assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", executionContext.getConfig().syncClientToolClassNames);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidHiveSyncProps() throws IOException {
|
public void testInvalidHiveSyncProps() throws IOException {
|
||||||
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
|
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
|
||||||
|
|||||||
Reference in New Issue
Block a user