From d63ceb023e9959560c1989890843d684fa990cf8 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 19 Mar 2024 15:47:06 +0800 Subject: [PATCH] =?UTF-8?q?fix(sync):=20=E4=BF=AE=E5=A4=8DGlobalConfigurat?= =?UTF-8?q?ion=E5=8F=82=E6=95=B0=E4=B8=8D=E5=85=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/lanyuanxiaoyao/service/sync/Compactor.java | 2 +- .../lanyuanxiaoyao/service/sync/Synchronizer.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java index bd3a092..28dc799 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Compactor.java @@ -95,7 +95,7 @@ public class Compactor { config.compactionPlanInstant = selectedInstants; } - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); Configuration configuration = SyncUtils.getCompactionFlinkConfiguration( globalConfiguration, new Configuration(), diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java index 713ae8f..a661980 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/Synchronizer.java @@ -104,7 +104,7 @@ public class Synchronizer { switch (flinkJob.getRunMode()) { case ALL_IN_ONE: for (TableMeta tableMeta : tableMetaList) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); } @@ -112,29 +112,29 @@ public class Synchronizer { break; case ONE_IN_ONE: for (TableMeta tableMeta : tableMetaList) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName(), tableMeta.getAlias())); } break; case ALL_IN_ONE_BY_TABLE: - scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable); + scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable); break; case ALL_IN_ONE_BY_SCHEMA: - scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema); + scheduleOneInOneRegistryByField(environment, signature, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema); break; default: throw new IllegalArgumentException("Unsupported run mode: " + flinkJob.getRunMode()); } } - private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String cluster, String applicationId, FlinkJob flinkJob, List tableMetaList, Function field) throws Exception { + private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String signature, String cluster, String applicationId, FlinkJob flinkJob, List tableMetaList, Function field) throws Exception { Map> map = tableMetaList.stream() .collect(Collectors.groupingBy(field)); for (Map.Entry> entry : map.entrySet()) { for (TableMeta tableMeta : entry.getValue()) { - GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta); + GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta); createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta); publishSyncStart(globalConfiguration, flinkJob, tableMeta); }