diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java index bd3a092..28dc799 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java @@ -95,7 +95,7 @@ public class Compactor { config.compactionPlanInstant = selectedInstants; } - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); Configuration configuration = SyncUtils.getCompactionFlinkConfiguration( globalConfiguration, new Configuration(), diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java index 713ae8f..a661980 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java @@ -104,7 +104,7 @@ public class Synchronizer { switch (flinkJob.getRunMode()) { case ALL_IN_ONE: for (TableMeta tableMeta : tableMetaList) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); } @@ -112,29 +112,29 @@ public class Synchronizer { break; case ONE_IN_ONE: for (TableMeta tableMeta : tableMetaList) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName(), tableMeta.getAlias())); } break; case ALL_IN_ONE_BY_TABLE: - scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable); + scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable); break; case ALL_IN_ONE_BY_SCHEMA: - scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema); + scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema); break; default: throw new IllegalArgumentException("Unsupported run mode: " + flinkJob.getRunMode()); } } - private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String cluster, String applicationId, FlinkJob flinkJob, List tableMetaList, Function field) throws Exception { + private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String signature, String cluster, String applicationId, FlinkJob flinkJob, List tableMetaList, Function field) throws Exception { Map> map = tableMetaList.stream() .collect(Collectors.groupingBy(field)); for (Map.Entry> entry : map.entrySet()) { for (TableMeta tableMeta : entry.getValue()) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); }