feat(scheduler): 使用简化的table_meta加速查询
This commit is contained in:
@@ -138,7 +138,7 @@ public class ScheduleController {
|
||||
infoService,
|
||||
hudiService,
|
||||
mapper,
|
||||
meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getJob().getId(), flinkJobId),
|
||||
meta -> StrUtil.equals(meta.getAlias(), alias) && NumberUtil.equals(meta.getFlinkJobId(), flinkJobId),
|
||||
"Schedule manually",
|
||||
metadata.toImmutable()
|
||||
);
|
||||
@@ -155,7 +155,7 @@ public class ScheduleController {
|
||||
infoService,
|
||||
hudiService,
|
||||
mapper,
|
||||
meta -> keys.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())),
|
||||
meta -> keys.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())),
|
||||
"Schedule manually"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ public class FocusUnVersionUpdateScheduleJob extends BaseScheduleJob {
|
||||
hudiService,
|
||||
mapper,
|
||||
meta -> meta.getPriority() >= 10000
|
||||
&& unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getJob().getId(), meta.getAlias())),
|
||||
&& unUpdateVersionTableIds.contains(StrUtil.format("{}-{}", meta.getFlinkJobId(), meta.getAlias())),
|
||||
comment
|
||||
);
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ public class OdsFocusScheduleJob extends BaseScheduleJob {
|
||||
infoService,
|
||||
hudiService,
|
||||
mapper,
|
||||
meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_ODS_FOCUS),
|
||||
meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_ODS_FOCUS),
|
||||
comment
|
||||
);
|
||||
}
|
||||
|
||||
@@ -7,10 +7,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.github.loki4j.slf4j.marker.LabelMarker;
|
||||
import com.lanyuanxiaoyao.service.common.Constants;
|
||||
import com.lanyuanxiaoyao.service.common.entity.SyncState;
|
||||
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.common.entity.compaction.ScheduleJob;
|
||||
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
|
||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
|
||||
import com.lanyuanxiaoyao.service.configuration.utils.QueueUtil;
|
||||
import com.lanyuanxiaoyao.service.forest.service.HudiService;
|
||||
@@ -42,7 +42,7 @@ public class ScheduleHelper {
|
||||
InfoService infoService,
|
||||
HudiService hudiService,
|
||||
ObjectMapper mapper,
|
||||
Predicate<TableMeta> predicate,
|
||||
Predicate<SimpleTableMeta> predicate,
|
||||
String comment
|
||||
) {
|
||||
schedule(discoveryClient, infoService, hudiService, mapper, predicate, comment, Maps.immutable.empty());
|
||||
@@ -53,57 +53,57 @@ public class ScheduleHelper {
|
||||
InfoService infoService,
|
||||
HudiService hudiService,
|
||||
ObjectMapper mapper,
|
||||
Predicate<TableMeta> predicate,
|
||||
Predicate<SimpleTableMeta> predicate,
|
||||
String comment,
|
||||
ImmutableMap<String, String> metadata
|
||||
) {
|
||||
String batchId = IdUtil.nanoId(10);
|
||||
infoService.tableMetaList()
|
||||
infoService.simpleTableMetas()
|
||||
// 只调度 MOR 表
|
||||
.select(meta -> StrUtil.equals(Constants.MOR, meta.getHudi().getTargetTableType()))
|
||||
.select(meta -> StrUtil.equals(Constants.MOR, meta.getTargetTableType()))
|
||||
.asParallel(ExecutorProvider.EXECUTORS, 1)
|
||||
.select(predicate::test)
|
||||
// 没有 Hudi 表的过滤掉
|
||||
.select(meta -> {
|
||||
try {
|
||||
return hudiService.existsHudiTable(meta.getJob().getId(), meta.getAlias());
|
||||
return hudiService.existsHudiTable(meta.getFlinkJobId(), meta.getAlias());
|
||||
} catch (Throwable throwable) {
|
||||
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get hudi status failure", throwable);
|
||||
logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get hudi status failure", throwable);
|
||||
}
|
||||
return false;
|
||||
})
|
||||
// 没有压缩计划的过滤掉
|
||||
.select(meta -> {
|
||||
try {
|
||||
return hudiService.existsCompactionPlan(meta.getJob().getId(), meta.getAlias());
|
||||
return hudiService.existsCompactionPlan(meta.getFlinkJobId(), meta.getAlias());
|
||||
} catch (Throwable throwable) {
|
||||
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get compaction status failure", throwable);
|
||||
logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get compaction status failure", throwable);
|
||||
}
|
||||
return false;
|
||||
})
|
||||
// 拒绝不压缩标志的任务
|
||||
.reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT))
|
||||
.reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_COMPACT))
|
||||
// 拒绝不调度压缩标志的任务
|
||||
.reject(meta -> TableMetaHelper.existsTag(meta, Constants.TAGS_NO_SCHEDULE_COMPACT))
|
||||
.reject(meta -> TableMetaHelper.existsTag(meta.getTags(), Constants.TAGS_NO_SCHEDULE_COMPACT))
|
||||
.collect(meta -> {
|
||||
long compactionDuration = 0L;
|
||||
try {
|
||||
// 计算压缩耗时
|
||||
SyncState syncState = infoService.syncStateDetail(meta.getJob().getId(), meta.getAlias());
|
||||
SyncState syncState = infoService.syncStateDetail(meta.getFlinkJobId(), meta.getAlias());
|
||||
if (ObjectUtil.isNotNull(syncState)
|
||||
&& ObjectUtil.isNotNull(syncState.getCompactionFinishTime())
|
||||
&& ObjectUtil.isNotNull(syncState.getCompactionStartTime())) {
|
||||
compactionDuration = syncState.getCompactionFinishTime() - syncState.getCompactionStartTime();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.error(makeMarker(meta.getJob().getId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getJob().getId(), meta.getAlias());
|
||||
logger.error(makeMarker(meta.getFlinkJobId(), meta.getAlias()), "Get sync state failure for {} {}", meta.getFlinkJobId(), meta.getAlias());
|
||||
}
|
||||
return new TableMetaWrapper(meta, compactionDuration);
|
||||
})
|
||||
.toSortedList(
|
||||
Comparator
|
||||
// 比较 Bucket 数,数量大的在前面
|
||||
.comparing(TableMetaWrapper::getBucketIndexNumber, Comparator.reverseOrder())
|
||||
.comparing(TableMetaWrapper::getBucketNumber, Comparator.reverseOrder())
|
||||
// 比较压缩耗时,压缩耗时长的在前面
|
||||
.thenComparing(TableMetaWrapper::getCompactionDuration, Comparator.reverseOrder()))
|
||||
.collect(meta -> new QueueItem<>(
|
||||
@@ -131,28 +131,28 @@ public class ScheduleHelper {
|
||||
}
|
||||
|
||||
private static final class TableMetaWrapper {
|
||||
private final TableMeta tableMeta;
|
||||
private final SimpleTableMeta meta;
|
||||
private final Long compactionDuration;
|
||||
|
||||
private TableMetaWrapper(TableMeta tableMeta, Long compactionDuration) {
|
||||
this.tableMeta = tableMeta;
|
||||
private TableMetaWrapper(SimpleTableMeta meta, Long compactionDuration) {
|
||||
this.meta = meta;
|
||||
this.compactionDuration = compactionDuration;
|
||||
}
|
||||
|
||||
public Long getFlinkJobId() {
|
||||
return tableMeta.getJob().getId();
|
||||
return meta.getFlinkJobId();
|
||||
}
|
||||
|
||||
public String getAlias() {
|
||||
return tableMeta.getAlias();
|
||||
return meta.getAlias();
|
||||
}
|
||||
|
||||
public Integer getBucketIndexNumber() {
|
||||
return tableMeta.getHudi().getBucketIndexNumber();
|
||||
public Integer getBucketNumber() {
|
||||
return meta.getBucketNumber();
|
||||
}
|
||||
|
||||
public Integer getPriority() {
|
||||
return tableMeta.getPriority();
|
||||
return meta.getPriority();
|
||||
}
|
||||
|
||||
public Long getCompactionDuration() {
|
||||
|
||||
Reference in New Issue
Block a user