From 24e4672dc33a79de2ddf1c6c51814054475ff7a0 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Mon, 3 Jun 2024 18:35:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E4=BD=BF=E7=94=A8=E7=AE=80?= =?UTF-8?q?=E5=8C=96=E7=9A=84table=5Fmeta=E5=8A=A0=E9=80=9F=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/common/utils/TableMetaHelper.java | 216 ++++++++++-------- .../entity/info/SimpleTableMeta.java | 30 ++- .../info/service/TableMetaService.java | 14 +- .../controller/ScheduleController.java | 4 +- .../FocusUnVersionUpdateScheduleJob.java | 2 +- .../compaction/OdsFocusScheduleJob.java | 2 +- .../scheduler/utils/ScheduleHelper.java | 44 ++-- 7 files changed, 181 insertions(+), 131 deletions(-) diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java index b1bb8f0..dd0fc42 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/utils/TableMetaHelper.java @@ -6,7 +6,13 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.exception.ConfigException; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -17,7 +23,7 @@ import java.util.stream.Collectors; * @date 2021-12-01 */ public class TableMetaHelper { - //private static final AES AES = new AES(Mode.CBC, Padding.NoPadding, "6fa22c779ec14b98".getBytes(), "6fa22c779ec14b98".getBytes()); + // private static final AES AES = new AES(Mode.CBC, Padding.NoPadding, "6fa22c779ec14b98".getBytes(), "6fa22c779ec14b98".getBytes()); public static String tableMetaSql(String database) { return tableMetaSql(database, true, false); @@ -125,109 +131,109 @@ public class TableMetaHelper { public static String tableMetaSql(String database, Boolean filterByFlinkJobId, Boolean filterByAlias) { // language=MySQL return "select dst.ds_name,\n" + - " dst.schema_name,\n" + - " dst.table_name,\n" + - " dst.table_type,\n" + - " dstf.field_name,\n" + - " dstf.field_seq,\n" + - " dstf.field_type,\n" + - " dstf.primary_key,\n" + - " dstf.partition_key,\n" + - " dstf.length,\n" + - " tacti.tgt_db,\n" + - " tacti.tgt_table,\n" + - " tacti.tgt_table_type,\n" + - " tacti.tgt_hdfs_path,\n" + - " tajhc.write_tasks,\n" + - " tajhc.write_operation,\n" + - " tajhc.write_task_max_memory,\n" + - " tajhc.write_batch_size,\n" + - " tajhc.write_rate_limit,\n" + - " tacti.bucket_number,\n" + - " tajhc.compaction_strategy,\n" + - " tajhc.compaction_tasks,\n" + - " tajhc.compaction_delta_commits,\n" + - " tajhc.compaction_delta_seconds,\n" + - " tajhc.compaction_async_enabled,\n" + - " tajhc.compaction_max_memory,\n" + - " tajhc.configs,\n" + - " tacti.filter_field,\n" + - " tacti.filter_values,\n" + - " tacti.filter_type,\n" + - " tacti.src_topic,\n" + - " tacti.src_pulsar_addr,\n" + - " tayjc_sync.job_manager_memory as sync_job_manager_memory,\n" + - " tayjc_sync.task_manager_memory as sync_task_manager_memory,\n" + - " tayjc_compaction.job_manager_memory as compaction_job_manager_memory,\n" + - " tayjc_compaction.task_manager_memory as compaction_task_manger_momory,\n" + - " tacti.partition_field,\n" + - " tahss.message_id,\n" + - " tagc.metric_publish_url,\n" + - " tagc.metric_prometheus_url,\n" + - " tagc.metric_api_url,\n" + - " tagc.metric_publish_delay,\n" + - " tagc.metric_publish_period,\n" + - " tagc.metric_publish_timeout,\n" + - " tagc.metric_publish_batch,\n" + - " tafjc.id as job_id,\n" + - " tafjc.name as job_name,\n" + - " tagc.checkpoint_root_path,\n" + - " tajhc.source_tasks,\n" + - " tacti.alias,\n" + - " dst.connection,\n" + - " tacti.priority,\n" + - " dst.ds_type,\n" + - " tajhc.keep_file_version,\n" + - " tajhc.keep_commit_version,\n" + - " tacti.tags,\n" + - " tagc.zk_url,\n" + - " tacti.version,\n" + - " dstf.scale\n" + - "from `" + database + "`.tb_app_collect_table_info tacti\n" + - " left join `" + database + "`.tb_app_hudi_sync_state tahss\n" + - " on tahss.id = concat(tacti.flink_job_id, '-', tacti.alias),\n" + - " `" + database + "`.tb_app_flink_job_config tafjc,\n" + - " `" + database + "`.tb_app_hudi_job_config tajhc,\n" + - " `" + database + "`.tb_app_yarn_job_config tayjc_sync,\n" + - " `" + database + "`.tb_app_yarn_job_config tayjc_compaction,\n" + - " `" + database + "`.tb_app_global_config tagc,\n" + - " `" + database + "`.tb_app_hudi_compaction_schedule tahcs,\n" + - " `iap-datahub`.data_source_table_field dstf,\n" + - " (select ds.*, dst.table_id, dst.table_name, dst.table_type\n" + - " from `iap-datahub`.data_source_table dst,\n" + - " (select ds.ds_id, ds.ds_name, ds.ds_type, ds.schema_name, ds.connection\n" + - " from `iap-datahub`.data_source ds\n" + - " where ds.ds_role = 'src'\n" + - " and ds.ds_state = 'y'\n" + - " and ds.record_state = 'y') ds\n" + - " where dst.ds_id = ds.ds_id\n" + - " and dst.record_state = 'y') dst\n" + - "where dstf.table_id = dst.table_id\n" + - " and dstf.record_state = 'y'\n" + - " and dst.ds_type in ('udal', 'telepg')\n" + - " and dst.ds_name = tacti.src_db\n" + - " and dst.schema_name = tacti.src_schema\n" + - " and dst.table_name = tacti.src_table\n" + - " and tacti.flink_job_id = tafjc.id\n" + - " and tacti.hudi_job_id = tajhc.id\n" + - " and tacti.sync_yarn_job_id = tayjc_sync.id\n" + - " and tacti.compaction_yarn_job_id = tayjc_compaction.id\n" + - " and tacti.config_id = tagc.id\n" + - " and tacti.schedule_id = tahcs.id\n" + - (filterByFlinkJobId ? " and tafjc.id = ?\n" : "") + - (filterByAlias ? " and tacti.alias = ?\n" : "") + - " and tacti.status = 'y'\n" + - " and tafjc.status = 'y'\n" + - " and tajhc.status = 'y'\n" + - " and tayjc_sync.status = 'y'\n" + - " and tayjc_compaction.status = 'y'\n" + - "order by dstf.field_seq;"; + " dst.schema_name,\n" + + " dst.table_name,\n" + + " dst.table_type,\n" + + " dstf.field_name,\n" + + " dstf.field_seq,\n" + + " dstf.field_type,\n" + + " dstf.primary_key,\n" + + " dstf.partition_key,\n" + + " dstf.length,\n" + + " tacti.tgt_db,\n" + + " tacti.tgt_table,\n" + + " tacti.tgt_table_type,\n" + + " tacti.tgt_hdfs_path,\n" + + " tajhc.write_tasks,\n" + + " tajhc.write_operation,\n" + + " tajhc.write_task_max_memory,\n" + + " tajhc.write_batch_size,\n" + + " tajhc.write_rate_limit,\n" + + " tacti.bucket_number,\n" + + " tajhc.compaction_strategy,\n" + + " tajhc.compaction_tasks,\n" + + " tajhc.compaction_delta_commits,\n" + + " tajhc.compaction_delta_seconds,\n" + + " tajhc.compaction_async_enabled,\n" + + " tajhc.compaction_max_memory,\n" + + " tajhc.configs,\n" + + " tacti.filter_field,\n" + + " tacti.filter_values,\n" + + " tacti.filter_type,\n" + + " tacti.src_topic,\n" + + " tacti.src_pulsar_addr,\n" + + " tayjc_sync.job_manager_memory as sync_job_manager_memory,\n" + + " tayjc_sync.task_manager_memory as sync_task_manager_memory,\n" + + " tayjc_compaction.job_manager_memory as compaction_job_manager_memory,\n" + + " tayjc_compaction.task_manager_memory as compaction_task_manger_momory,\n" + + " tacti.partition_field,\n" + + " tahss.message_id,\n" + + " tagc.metric_publish_url,\n" + + " tagc.metric_prometheus_url,\n" + + " tagc.metric_api_url,\n" + + " tagc.metric_publish_delay,\n" + + " tagc.metric_publish_period,\n" + + " tagc.metric_publish_timeout,\n" + + " tagc.metric_publish_batch,\n" + + " tafjc.id as job_id,\n" + + " tafjc.name as job_name,\n" + + " tagc.checkpoint_root_path,\n" + + " tajhc.source_tasks,\n" + + " tacti.alias,\n" + + " dst.connection,\n" + + " tacti.priority,\n" + + " dst.ds_type,\n" + + " tajhc.keep_file_version,\n" + + " tajhc.keep_commit_version,\n" + + " tacti.tags,\n" + + " tagc.zk_url,\n" + + " tacti.version,\n" + + " dstf.scale\n" + + "from `" + database + "`.tb_app_collect_table_info tacti\n" + + " left join `" + database + "`.tb_app_hudi_sync_state tahss\n" + + " on tahss.id = concat(tacti.flink_job_id, '-', tacti.alias),\n" + + " `" + database + "`.tb_app_flink_job_config tafjc,\n" + + " `" + database + "`.tb_app_hudi_job_config tajhc,\n" + + " `" + database + "`.tb_app_yarn_job_config tayjc_sync,\n" + + " `" + database + "`.tb_app_yarn_job_config tayjc_compaction,\n" + + " `" + database + "`.tb_app_global_config tagc,\n" + + " `" + database + "`.tb_app_hudi_compaction_schedule tahcs,\n" + + " `iap-datahub`.data_source_table_field dstf,\n" + + " (select ds.*, dst.table_id, dst.table_name, dst.table_type\n" + + " from `iap-datahub`.data_source_table dst,\n" + + " (select ds.ds_id, ds.ds_name, ds.ds_type, ds.schema_name, ds.connection\n" + + " from `iap-datahub`.data_source ds\n" + + " where ds.ds_role = 'src'\n" + + " and ds.ds_state = 'y'\n" + + " and ds.record_state = 'y') ds\n" + + " where dst.ds_id = ds.ds_id\n" + + " and dst.record_state = 'y') dst\n" + + "where dstf.table_id = dst.table_id\n" + + " and dstf.record_state = 'y'\n" + + " and dst.ds_type in ('udal', 'telepg')\n" + + " and dst.ds_name = tacti.src_db\n" + + " and dst.schema_name = tacti.src_schema\n" + + " and dst.table_name = tacti.src_table\n" + + " and tacti.flink_job_id = tafjc.id\n" + + " and tacti.hudi_job_id = tajhc.id\n" + + " and tacti.sync_yarn_job_id = tayjc_sync.id\n" + + " and tacti.compaction_yarn_job_id = tayjc_compaction.id\n" + + " and tacti.config_id = tagc.id\n" + + " and tacti.schedule_id = tahcs.id\n" + + (filterByFlinkJobId ? " and tafjc.id = ?\n" : "") + + (filterByAlias ? " and tacti.alias = ?\n" : "") + + " and tacti.status = 'y'\n" + + " and tafjc.status = 'y'\n" + + " and tajhc.status = 'y'\n" + + " and tayjc_sync.status = 'y'\n" + + " and tayjc_compaction.status = 'y'\n" + + "order by dstf.field_seq;"; } public static List from(ResultSet rs) throws SQLException { List results = new ArrayList<>(); List metaList = new ArrayList<>(); - while (rs.next( )) { + while (rs.next()) { metaList.add( TableMeta.RowMeta.builder() .dsName(rs.getString(1)) @@ -571,6 +577,14 @@ public class TableMetaHelper { } public static boolean existsTag(TableMeta meta, String tag) { - return meta.getTags() != null && meta.getTags().contains(tag); + return existsTag(meta.getTags(), tag); + } + + public static boolean existsTag(String sourceTags, String tag) { + return existsTag(Arrays.asList(sourceTags.split(",")), tag); + } + + public static boolean existsTag(List sourceTags, String tag) { + return sourceTags != null && sourceTags.contains(tag); } } diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/SimpleTableMeta.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/SimpleTableMeta.java index 22a66b2..4b13a9b 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/SimpleTableMeta.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/info/SimpleTableMeta.java @@ -12,22 +12,30 @@ public class SimpleTableMeta { private String alias; private String schema; private String table; + private Integer priority; + private String targetTableType; private String targetDb; private String targetTable; private String targetHdfs; + private String tags; + private Integer bucketNumber; public SimpleTableMeta() { } - public SimpleTableMeta(Long flinkJobId, String flinkJobName, String alias, String schema, String table, String targetDb, String targetTable, String targetHdfs) { + public SimpleTableMeta(Long flinkJobId, String flinkJobName, String alias, String schema, String table, Integer priority, String targetTableType, String targetDb, String targetTable, String targetHdfs, String tags, Integer bucketNumber) { this.flinkJobId = flinkJobId; this.flinkJobName = flinkJobName; this.alias = alias; this.schema = schema; this.table = table; + this.priority = priority; + this.targetTableType = targetTableType; this.targetDb = targetDb; this.targetTable = targetTable; this.targetHdfs = targetHdfs; + this.tags = tags; + this.bucketNumber = bucketNumber; } public Long getFlinkJobId() { @@ -50,6 +58,14 @@ public class SimpleTableMeta { return table; } + public Integer getPriority() { + return priority; + } + + public String getTargetTableType() { + return targetTableType; + } + public String getTargetDb() { return targetDb; } @@ -62,6 +78,14 @@ public class SimpleTableMeta { return targetHdfs; } + public String getTags() { + return tags; + } + + public Integer getBucketNumber() { + return bucketNumber; + } + @Override public String toString() { return "SimpleTableMeta{" + @@ -70,9 +94,13 @@ public class SimpleTableMeta { ", alias='" + alias + '\'' + ", schema='" + schema + '\'' + ", table='" + table + '\'' + + ", priority=" + priority + + ", targetTableType='" + targetTableType + '\'' + ", targetDb='" + targetDb + '\'' + ", targetTable='" + targetTable + '\'' + ", targetHdfs='" + targetHdfs + '\'' + + ", tags='" + tags + '\'' + + ", bucketNumber=" + bucketNumber + '}'; } } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java index be239f3..ed58847 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/TableMetaService.java @@ -593,9 +593,13 @@ public class TableMetaService extends BaseService { TbAppCollectTableInfo.ALIAS_A, TbAppCollectTableInfo.SRC_SCHEMA_A, TbAppCollectTableInfo.SRC_TABLE_A, + TbAppCollectTableInfo.PRIORITY_A, + TbAppCollectTableInfo.TGT_TABLE_TYPE_A, TbAppCollectTableInfo.TGT_DB_A, TbAppCollectTableInfo.TGT_TABLE_A, - TbAppCollectTableInfo.TGT_HDFS_PATH_A + TbAppCollectTableInfo.TGT_HDFS_PATH_A, + TbAppCollectTableInfo.TAGS_A, + TbAppCollectTableInfo.BUCKET_NUMBER_A ), flinkJobId, alias @@ -606,9 +610,13 @@ public class TableMetaService extends BaseService { rs.getString(3), rs.getString(4), rs.getString(5), - rs.getString(6), + rs.getInt(6), rs.getString(7), - rs.getString(8) + rs.getString(8), + rs.getString(9), + rs.getString(10), + rs.getString(11), + rs.getInt(12) ) ) ); diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java index d1ba14f..9c035cf 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/controller/ScheduleController.java @@ -138,7 +138,7 @@ public class ScheduleController { infoService, hudiService, mapper, - meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getJob().getId(), flinkJobId), + meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getFlinkJobId(), flinkJobId), "Schedule manually", metadata.toImmutable() ); @@ -155,7 +155,7 @@ public class ScheduleController { infoService, hudiService, mapper, - meta -> keys.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), + meta -> keys.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), "Schedule manually" ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java index a8df519..e4d5fbb 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/FocusUnVersionUpdateScheduleJob.java @@ -48,7 +48,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob { hudiService, mapper, meta -> meta.getPriority() >= 10000 - && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())), + && unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java index b43feae..c9d62d3 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/quartz/compaction/OdsFocusScheduleJob.java @@ -46,7 +46,7 @@ public class OdsFocusScheduleJob extends BaseScheduleJob { infoService, hudiService, mapper, - meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS), + meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_ODS_FOCUS), comment ); } diff --git a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java index b1f8d45..f611085 100644 --- a/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java +++ b/service-scheduler/src/main/java/com/lanyuanxiaoyao/service/scheduler/utils/ScheduleHelper.java @@ -7,10 +7,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.loki4j.slf4j.marker.LabelMarker; import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.SyncState; -import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.common.entity.compaction.ScheduleJob; import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta; import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem; import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil; import com.lanyuanxiaoyao.service.forest.service.HudiService; @@ -42,7 +42,7 @@ public class ScheduleHelper { InfoService infoService, HudiService hudiService, ObjectMapper mapper, - Predicate predicate, + Predicate predicate, String comment ) { schedule(discoveryClient, infoService, hudiService, mapper, predicate, comment, Maps.immutable.empty()); @@ -53,57 +53,57 @@ public class ScheduleHelper { InfoService infoService, HudiService hudiService, ObjectMapper mapper, - Predicate predicate, + Predicate predicate, String comment, ImmutableMap metadata ) { String batchId = IdUtil.nanoId(10); - infoService.tableMetaList() + infoService.simpleTableMetas() // 只调度 MOR 表 - .select(meta -> StrUtil.equals(Constants.MOR, meta.getHudi().getTargetTableType())) + .select(meta -> StrUtil.equals(Constants.MOR, meta.getTargetTableType())) .asParallel(ExecutorProvider.EXECUTORS, 1) .select(predicate::test) // 没有 Hudi 表的过滤掉 .select(meta -> { try { - return hudiService.existsHudiTable(meta.getJob().getId(), meta.getAlias()); + return hudiService.existsHudiTable(meta.getFlinkJobId(), meta.getAlias()); } catch (Throwable throwable) { - logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get hudi status failure", throwable); + logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get hudi status failure", throwable); } return false; }) // 没有压缩计划的过滤掉 .select(meta -> { try { - return hudiService.existsCompactionPlan(meta.getJob().getId(), meta.getAlias()); + return hudiService.existsCompactionPlan(meta.getFlinkJobId(), meta.getAlias()); } catch (Throwable throwable) { - logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get compaction status failure", throwable); + logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get compaction status failure", throwable); } return false; }) // 拒绝不压缩标志的任务 - .reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) + .reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_COMPACT)) // 拒绝不调度压缩标志的任务 - .reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_SCHEDULE_COMPACT)) + .reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_SCHEDULE_COMPACT)) .collect(meta -> { long compactionDuration = 0L; try { // 计算压缩耗时 - SyncState syncState = infoService.syncStateDetail(meta.getJob().getId(), meta.getAlias()); + SyncState syncState = infoService.syncStateDetail(meta.getFlinkJobId(), meta.getAlias()); if (ObjectUtil.isNotNull(syncState) && ObjectUtil.isNotNull(syncState.getCompactionFinishTime()) && ObjectUtil.isNotNull(syncState.getCompactionStartTime())) { compactionDuration = syncState.getCompactionFinishTime() - syncState.getCompactionStartTime(); } } catch (Throwable e) { - logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getJob().getId(), meta.getAlias()); + logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getFlinkJobId(), meta.getAlias()); } return new TableMetaWrapper(meta, compactionDuration); }) .toSortedList( Comparator // 比较 Bucket 数,数量大的在前面 - .comparing(TableMetaWrapper::getBucketIndexNumber, Comparator.reverseOrder()) + .comparing(TableMetaWrapper::getBucketNumber, Comparator.reverseOrder()) // 比较压缩耗时,压缩耗时长的在前面 .thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder())) .collect(meta -> new QueueItem<>( @@ -131,28 +131,28 @@ public class ScheduleHelper { } private static final class TableMetaWrapper { - private final TableMeta tableMeta; + private final SimpleTableMeta meta; private final Long compactionDuration; - private TableMetaWrapper(TableMeta tableMeta, Long compactionDuration) { - this.tableMeta = tableMeta; + private TableMetaWrapper(SimpleTableMeta meta, Long compactionDuration) { + this.meta = meta; this.compactionDuration = compactionDuration; } public Long getFlinkJobId() { - return tableMeta.getJob().getId(); + return meta.getFlinkJobId(); } public String getAlias() { - return tableMeta.getAlias(); + return meta.getAlias(); } - public Integer getBucketIndexNumber() { - return tableMeta.getHudi().getBucketIndexNumber(); + public Integer getBucketNumber() { + return meta.getBucketNumber(); } public Integer getPriority() { - return tableMeta.getPriority(); + return meta.getPriority(); } public Long getCompactionDuration() {