fix(sync): 修复GlobalConfiguration参数不全
This commit is contained in:
@@ -95,7 +95,7 @@ public class Compactor {
|
||||
config.compactionPlanInstant = selectedInstants;
|
||||
}
|
||||
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta);
|
||||
Configuration configuration = SyncUtils.getCompactionFlinkConfiguration(
|
||||
globalConfiguration,
|
||||
new Configuration(),
|
||||
|
||||
@@ -104,7 +104,7 @@ public class Synchronizer {
|
||||
switch (flinkJob.getRunMode()) {
|
||||
case ALL_IN_ONE:
|
||||
for (TableMeta tableMeta : tableMetaList) {
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta);
|
||||
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
|
||||
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
|
||||
}
|
||||
@@ -112,29 +112,29 @@ public class Synchronizer {
|
||||
break;
|
||||
case ONE_IN_ONE:
|
||||
for (TableMeta tableMeta : tableMetaList) {
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta);
|
||||
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
|
||||
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
|
||||
environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName(), tableMeta.getAlias()));
|
||||
}
|
||||
break;
|
||||
case ALL_IN_ONE_BY_TABLE:
|
||||
scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable);
|
||||
scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable);
|
||||
break;
|
||||
case ALL_IN_ONE_BY_SCHEMA:
|
||||
scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema);
|
||||
scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported run mode: " + flinkJob.getRunMode());
|
||||
}
|
||||
}
|
||||
|
||||
private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String cluster, String applicationId, FlinkJob flinkJob, List<TableMeta> tableMetaList, Function<TableMeta, String> field) throws Exception {
|
||||
private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String signature, String cluster, String applicationId, FlinkJob flinkJob, List<TableMeta> tableMetaList, Function<TableMeta, String> field) throws Exception {
|
||||
Map<String, List<TableMeta>> map = tableMetaList.stream()
|
||||
.collect(Collectors.groupingBy(field));
|
||||
for (Map.Entry<String, List<TableMeta>> entry : map.entrySet()) {
|
||||
for (TableMeta tableMeta : entry.getValue()) {
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
|
||||
GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta);
|
||||
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
|
||||
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user