From 8a5e6f69ac88cb2b0828e5e842a1b3019651435f Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Tue, 2 Jan 2024 09:55:22 +0800 Subject: [PATCH] =?UTF-8?q?fix(web):=20=E7=A7=BB=E9=99=A4eureka=E8=B7=B3?= =?UTF-8?q?=E8=BD=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/info/service/InfoService.java | 417 +++++++++++++++++- .../test/java/com/test/SqlBuilderTests.java | 13 +- web/components/cloud-tab.js | 14 - 3 files changed, 413 insertions(+), 31 deletions(-) diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java index b9d9a9c..80395d0 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/InfoService.java @@ -5,20 +5,29 @@ import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.WhereSqlBuilder; import club.kingon.sql.builder.entry.Alias; import club.kingon.sql.builder.entry.Column; +import cn.hutool.core.collection.IterUtil; import cn.hutool.core.lang.Pair; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSource; +import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTable; +import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTableField; import com.eshore.odcp.hudi.connector.entity.FlinkJob; import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; +import com.eshore.odcp.hudi.connector.exception.ConfigException; import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException; import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException; -import com.eshore.odcp.hudi.connector.utils.database.DatabaseService; +import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.*; import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider; import java.sql.Timestamp; import java.util.List; +import java.util.Locale; import java.util.function.Function; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; @@ -46,13 +55,11 @@ public class InfoService { private static final String COUNT = "count(*)"; private static final String STATUS_Y = "y"; private static final String STATUS_N = "n"; - private final DatabaseService databaseService; private final JdbcTemplate mysqlJdbcTemplate; private final TransactionTemplate mysqlTransactionTemplate; private final SQLLoggerProvider.SQLLogger sqlLogger; - public InfoService(DatabaseService databaseService, JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) { - this.databaseService = databaseService; + public InfoService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) { this.mysqlJdbcTemplate = mysqlJdbcTemplate; this.mysqlTransactionTemplate = mysqlTransactionTemplate; this.sqlLogger = sqlLogger; @@ -78,6 +85,19 @@ public class InfoService { return StrUtil.format("{}.{}", table.getAlias(), column); } + private static void checkMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { + ConfigException.check(fieldName + " cannot be more than 1", () -> IterUtil.size(iterable) > 1); + } + + private static void checkEmpty(String fieldName, Iterable iterable) throws ConfigException { + ConfigException.check(fieldName + " cannot be empty", () -> IterUtil.isEmpty(iterable)); + } + + private static void checkEmptyOrMoreThanOne(String fieldName, Iterable iterable) throws ConfigException { + checkEmpty(fieldName, iterable); + checkMoreThanOne(fieldName, iterable); + } + @Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) public SyncState syncState(Long flinkJobId, String alias) { @@ -262,10 +282,10 @@ public class InfoService { @Retryable(Throwable.class) public ImmutableList jobAndMetas() { return flinkJobs() - .collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId()))); + .collect(job -> new JobAndMetas(job, tableMetaList(job.getId()))); } - private ImmutableList flinkJobs(Long flinkJobId) { + private ImmutableList flinkJobList(Long flinkJobId) { return Lists.immutable.ofAll( mysqlJdbcTemplate.query( SqlBuilder.select( @@ -306,33 +326,406 @@ public class InfoService { @Cacheable(value = "flink-jobs", sync = true) @Retryable(Throwable.class) public ImmutableList flinkJobs() { - return flinkJobs(null); + return flinkJobList(null); } @Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId") @Retryable(Throwable.class) public FlinkJob flinkJob(Long flinkJobId) { - return flinkJobs(flinkJobId) + return flinkJobList(flinkJobId) .getFirstOptional() - .orElseThrow(FlinkJobNotFoundException::new); + .orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId)); + } + + private ImmutableList tableMetaList(Long flinkJobId) { + return tableMetaList(flinkJobId, null); + } + + private ImmutableList tableMetaList(Long flinkJobId, String aliasText) { + return Lists.immutable.ofAll( + mysqlJdbcTemplate.query( + SqlBuilder.select( + DataSource.DS_NAME_A, + DataSource.SCHEMA_NAME_A, + DataSourceTable.TABLE_NAME_A, + DataSourceTable.TABLE_TYPE_A, + DataSourceTableField.FIELD_NAME_A, + DataSourceTableField.FIELD_SEQ_A, + DataSourceTableField.FIELD_TYPE_A, + DataSourceTableField.PRIMARY_KEY_A, + DataSourceTableField.PARTITION_KEY_A, + DataSourceTableField.LENGTH_A, + TbAppCollectTableInfo.TGT_DB_A, + TbAppCollectTableInfo.TGT_TABLE_A, + TbAppCollectTableInfo.TGT_TABLE_TYPE_A, + TbAppCollectTableInfo.TGT_HDFS_PATH_A, + TbAppHudiJobConfig.WRITE_TASKS_A, + TbAppHudiJobConfig.WRITE_OPERATION_A, + TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A, + TbAppHudiJobConfig.WRITE_BATCH_SIZE_A, + TbAppHudiJobConfig.WRITE_RATE_LIMIT_A, + TbAppCollectTableInfo.BUCKET_NUMBER_A, + TbAppHudiJobConfig.COMPACTION_STRATEGY_A, + TbAppHudiJobConfig.COMPACTION_TASKS_A, + TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A, + TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A, + TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A, + TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A, + TbAppHudiJobConfig.CONFIGS_A, + TbAppCollectTableInfo.FILTER_FIELD_A, + TbAppCollectTableInfo.FILTER_VALUES_A, + TbAppCollectTableInfo.FILTER_TYPE_A, + TbAppCollectTableInfo.SRC_TOPIC_A, + TbAppCollectTableInfo.SRC_PULSAR_ADDR_A, + Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"), + Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"), + Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"), + Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"), + TbAppCollectTableInfo.PARTITION_FIELD_A, + TbAppHudiSyncState.MESSAGE_ID_A, + TbAppGlobalConfig.METRIC_PUBLISH_URL_A, + TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A, + TbAppGlobalConfig.METRIC_API_URL_A, + TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A, + TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A, + TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A, + TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A, + Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"), + Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"), + TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A, + TbAppHudiJobConfig.SOURCE_TASKS_A, + TbAppCollectTableInfo.ALIAS_A, + DataSource.CONNECTION_A, + TbAppCollectTableInfo.PRIORITY_A, + DataSource.DS_TYPE_A, + TbAppHudiJobConfig.KEEP_FILE_VERSION_A, + TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A, + TbAppCollectTableInfo.TAGS_A, + TbAppGlobalConfig.ZK_URL_A, + TbAppCollectTableInfo.VERSION_A, + DataSourceTableField.SCALE_A + ) + .from( + DataSource._alias_, + DataSourceTable._alias_, + DataSourceTableField._alias_, + TbAppFlinkJobConfig._alias_, + TbAppHudiJobConfig._alias_, + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"), + Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"), + TbAppGlobalConfig._alias_, + TbAppCollectTableInfo._alias_, + TbAppHudiSyncState._alias_ + ) + .whereEq(DataSource.DS_ROLE_A, "src") + .andEq(DataSource.DS_STATE_A, STATUS_Y) + .andEq(DataSource.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A)) + .andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y) + .andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A)) + .andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y) + .andIn(DataSource.DS_TYPE_A, "udal", "telepg") + .andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A)) + .andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A)) + .andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A)) + .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A)) + .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id")) + .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id")) + .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A)) + .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A))) + .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) + .andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText) + .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y) + .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) + .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y) + .andEq("tayjc_sync.status", STATUS_Y) + .andEq("tayjc_compaction.status", STATUS_Y) + .orderBy(DataSourceTableField.FIELD_SEQ_A) + .build(), + (rs, row) -> TableMeta.RowMeta.builder() + .dsName(rs.getString(1)) + .schemaName(rs.getString(2)) + .tableName(rs.getString(3)) + .tableType(rs.getString(4)) + .fieldName(rs.getString(5)) + .fieldSeq(rs.getInt(6)) + .fieldType(rs.getString(7)) + .primaryKey(rs.getString(8)) + .partitionKey(rs.getString(9)) + .length(rs.getLong(10)) + .tgtDb(rs.getString(11)) + .tgtTable(rs.getString(12)) + .tgtTableType(rs.getString(13)) + .tgtHdfsPath(rs.getString(14)) + .writeTasks(rs.getInt(15)) + .writeOperation(rs.getString(16)) + .writeTaskMaxMemory(rs.getInt(17)) + .writeBatchSize(rs.getInt(18)) + .writeRateLimit(rs.getInt(19)) + .bucketIndexNumber(rs.getInt(20)) + .compactionStrategy(rs.getString(21)) + .compactionTasks(rs.getInt(22)) + .compactionDeltaCommits(rs.getInt(23)) + .compactionDeltaSeconds(rs.getInt(24)) + .compactionAsyncEnabled(rs.getString(25)) + .compactionMaxMemory(rs.getInt(26)) + .configs(rs.getString(27)) + .filterField(rs.getString(28)) + .filterValues(rs.getString(29)) + .filterType(rs.getString(30)) + .topic(rs.getString(31)) + .pulsarAddress(rs.getString(32)) + .syncJobManagerMemory(rs.getInt(33)) + .syncTaskManagerMemory(rs.getInt(34)) + .compactionJobManagerMemory(rs.getInt(35)) + .compactionTaskManagerMemory(rs.getInt(36)) + .partitionField(rs.getString(37)) + .messageId(rs.getString(38)) + .metricPublishUrl(rs.getString(39)) + .metricPrometheusUrl(rs.getString(40)) + .metricApiUrl(rs.getString(41)) + .metricPublishDelay(rs.getInt(42)) + .metricPublishPeriod(rs.getInt(43)) + .metricPublishTimeout(rs.getInt(44)) + .metricPublishBatch(rs.getInt(45)) + .jobId(rs.getLong(46)) + .jobName(rs.getString(47)) + .checkpointRootPath(rs.getString(48)) + .sourceTasks(rs.getInt(49)) + .alias(rs.getString(50)) + .connection(rs.getString(51)) + .priority(rs.getInt(52)) + .sourceType(rs.getString(53)) + .keepFileVersion(rs.getInt(54)) + .keepCommitVersion(rs.getInt(55)) + .tags(rs.getString(56)) + .zookeeperUrl(rs.getString(57)) + .version(rs.getInt(58)) + .scala(rs.getInt(59)) + .build() + ) + ) + .asParallel(ExecutorProvider.EXECUTORS, 5) + .groupBy(TableMeta.RowMeta::getAlias) + .multiValuesView() + .collect(aliasRowMetas -> { + try { + ImmutableList rows = aliasRowMetas + .toSortedListBy(TableMeta.RowMeta::getFieldSeq) + .toImmutable(); + + ImmutableList aliasList = rows.collect(TableMeta.RowMeta::getAlias).distinct(); + checkEmptyOrMoreThanOne("alias", aliasList); + String alias = aliasList.get(0); + + ImmutableList sourceTypeList = rows.collect(TableMeta.RowMeta::getSourceType).distinct(); + checkEmptyOrMoreThanOne("source_type", sourceTypeList); + String sourceTypeText = sourceTypeList.get(0).toUpperCase(); + TableMeta.SourceType sourceType; + try { + sourceType = TableMeta.SourceType.valueOf(sourceTypeText); + } catch (IllegalArgumentException e) { + throw new Exception("Cannot parse source type " + sourceTypeText); + } + + ImmutableList dsNames = rows.collect(TableMeta.RowMeta::getDsName).distinct(); + checkEmptyOrMoreThanOne("ds_name", dsNames); + String dataSource = dsNames.get(0); + + ImmutableList schemaNames = rows.collect(TableMeta.RowMeta::getSchemaName).distinct(); + checkEmptyOrMoreThanOne("schema_name", schemaNames); + String schema = schemaNames.get(0); + + ImmutableList tableNames = rows.collect(TableMeta.RowMeta::getTableName).distinct(); + // 每次只能获取 1 张表的元信息 + checkEmptyOrMoreThanOne("table_name", tableNames); + String table = tableNames.get(0); + + ImmutableList tableTypes = rows.collect(TableMeta.RowMeta::getTableType).distinct(); + checkEmptyOrMoreThanOne("table_type", tableTypes); + String type = tableTypes.get(0); + + ImmutableList filterFields = rows.collect(TableMeta.RowMeta::getFilterField).distinct(); + checkEmptyOrMoreThanOne("filter_field", filterFields); + String filterField = filterFields.get(0); + + ImmutableList filterValueList = rows.collect(TableMeta.RowMeta::getFilterValues).distinct(); + checkEmptyOrMoreThanOne("filter_values", filterValueList); + String filterValuesText = filterValueList.get(0); + ImmutableList filterValues = StrUtil.isBlank(filterValuesText) + ? Lists.immutable.empty() + : Lists.immutable.of(filterValuesText.split(",")); + ImmutableList filterTypes = rows.collect(TableMeta.RowMeta::getFilterType).distinct(); + checkEmptyOrMoreThanOne("filter_field", filterFields); + TableMeta.FilterType filterType; + try { + filterType = TableMeta.FilterType.valueOf(filterTypes.get(0)); + } catch (IllegalArgumentException e) { + filterType = TableMeta.FilterType.NONE; + } + + ImmutableList topics = rows.collect(TableMeta.RowMeta::getTopic).distinct(); + checkEmptyOrMoreThanOne("topic", topics); + String topic = topics.get(0); + + ImmutableList pulsarAddresses = rows.collect(TableMeta.RowMeta::getPulsarAddress).distinct(); + checkEmptyOrMoreThanOne("pulsar address", pulsarAddresses); + String pulsarAddress = pulsarAddresses.get(0); + + ImmutableList priorities = rows.collect(TableMeta.RowMeta::getPriority).distinct(); + checkEmptyOrMoreThanOne("priority", priorities); + Integer priority = priorities.get(0); + + ImmutableList tagTexts = rows.collect(TableMeta.RowMeta::getTags).distinct(); + checkEmptyOrMoreThanOne("tags", tagTexts); + String tagText = ObjectUtil.isNull(tagTexts.get(0)) ? "" : tagTexts.get(0); + ImmutableList tags = Lists.immutable.of(tagText.split(",")); + + ImmutableList versions = rows.collect(TableMeta.RowMeta::getVersion).distinct(); + checkEmptyOrMoreThanOne("version", versions); + Integer version = versions.get(0); + + // 获取 Hudi 配置, 因为查出来同一张表的配置都相同, 所以直接取第一条即可 + TableMeta.RowMeta example = rows.get(0); + TableMeta.HudiMeta hudiMeta = TableMeta.HudiMeta.builder() + .targetDataSource(example.getTgtDb()) + .targetTable(example.getTgtTable()) + .targetTableType(example.getTgtTableType()) + .targetHdfsPath(example.getTgtHdfsPath()) + .sourceTasks(example.getSourceTasks()) + .writeTasks(example.getWriteTasks()) + .writeOperation(example.getWriteOperation()) + .writeTaskMaxMemory(example.getWriteTaskMaxMemory()) + .writeBatchSize(example.getWriteBatchSize()) + .writeRateLimit(example.getWriteRateLimit()) + .bucketIndexNumber(example.getBucketIndexNumber()) + .compactionStrategy(example.getCompactionStrategy()) + .compactionTasks(example.getCompactionTasks()) + .compactionDeltaCommits(example.getCompactionDeltaCommits()) + .compactionDeltaSeconds(example.getCompactionDeltaSeconds()) + .compactionAsyncEnabled(example.getCompactionAsyncEnabled()) + .compactionMaxMemory(example.getCompactionMaxMemory()) + .configs(example.getConfigs()) + .keepFileVersion(example.getKeepFileVersion()) + .keepCommitVersion(example.getKeepCommitVersion()) + .build(); + TableMeta.YarnMeta syncYarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(example.getSyncJobManagerMemory()) + .taskManagerMemory(example.getSyncTaskManagerMemory()) + .build(); + TableMeta.YarnMeta compactionYarnMeta = TableMeta.YarnMeta.builder() + .jobManagerMemory(example.getCompactionJobManagerMemory()) + .taskManagerMemory(example.getCompactionTaskManagerMemory()) + .build(); + TableMeta.ConfigMeta configMeta = TableMeta.ConfigMeta.builder() + .messageId(example.getMessageId()) + .metricPublishUrl(example.getMetricPublishUrl()) + .metricPrometheusUrl(example.getMetricPrometheusUrl()) + .metricApiUrl(example.getMetricApiUrl()) + .metricPublishDelay(example.getMetricPublishDelay()) + .metricPublishPeriod(example.getMetricPublishPeriod()) + .metricPublishTimeout(example.getMetricPublishTimeout()) + .metricPublishBatch(example.getMetricPublishBatch()) + .checkpointRootPath(example.getCheckpointRootPath()) + .zookeeperUrl(example.getZookeeperUrl()) + .build(); + TableMeta.JobMeta jobMeta = TableMeta.JobMeta.builder() + .id(example.getJobId()) + .name(example.getJobName()) + .build(); + + TableMeta.ConnectionMeta connectionMeta = null; + String connectionText = example.getConnection(); + if (StrUtil.isNotBlank(connectionText)) { + JSONObject connectionObj = JSONUtil.parseObj(connectionText); + connectionMeta = TableMeta.ConnectionMeta.builder() + .url(connectionObj.getStr("jdbc_url")) + .user(connectionObj.getStr("jdbc_user")) + .password(connectionObj.getStr("jdbc_password")) + .driver(connectionObj.getStr("jdbc_driver")) + .build(); + } + + ImmutableList partitionFields = rows.collect(TableMeta.RowMeta::getPartitionField).distinct(); + checkEmptyOrMoreThanOne("partition_field", filterFields); + String partitionField = partitionFields.get(0); + + List primaryKeys = Lists.mutable.empty(), + partitionKeys = Lists.mutable.empty(), + fieldMetaList = Lists.mutable.empty(); + for (TableMeta.RowMeta rowMeta : rows) { + boolean isPrimaryKey = StrUtil.equals(STATUS_Y, rowMeta.getPrimaryKey()); + boolean isPartitionKey = StrUtil.equals(STATUS_Y, rowMeta.getPartitionKey()); + TableMeta.FieldMeta fieldMeta = TableMeta.FieldMeta.builder() + .name(rowMeta.getFieldName().toUpperCase(Locale.ROOT)) + .sequence(rowMeta.getFieldSeq()) + .type(rowMeta.getFieldType()) + .isPrimaryKey(isPrimaryKey) + .partitionKey(isPartitionKey) + .length(rowMeta.getLength()) + .scala(rowMeta.getScala()) + .build(); + if (isPrimaryKey) { + primaryKeys.add(fieldMeta); + } + if (isPartitionKey) { + partitionKeys.add(fieldMeta); + } + fieldMetaList.add(fieldMeta); + } + return TableMeta.builder() + .alias(alias) + .source(dataSource) + .schema(schema) + .table(table) + .type(type) + .primaryKeys(primaryKeys) + .partitionKeys(partitionKeys) + .hudi(hudiMeta) + .fields(fieldMetaList) + .filterField(filterField) + .filterValues(filterValues.toList()) + .filterType(filterType) + .topic(topic) + .pulsarAddress(pulsarAddress) + .syncYarn(syncYarnMeta) + .compactionYarn(compactionYarnMeta) + .partitionField(partitionField) + .config(configMeta) + .job(jobMeta) + .connection(connectionMeta) + .priority(priority) + .sourceType(sourceType) + .tags(tags.toList()) + .version(version) + .build(); + } catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + }) + .toList() + .toImmutable(); } @Cacheable(value = "table-metas", sync = true) @Retryable(Throwable.class) public ImmutableList tableMetas() { - return flinkJobs().flatCollect(job -> databaseService.findTableMeta(job.getId())); + return flinkJobs().flatCollect(job -> tableMetaList(job.getId())); } @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId") @Retryable(Throwable.class) public ImmutableList tableMetas(Long flinkJobId) { - return databaseService.findTableMeta(flinkJobId); + return tableMetaList(flinkJobId); } @Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias") @Retryable(Throwable.class) public TableMeta tableMeta(Long flinkJobId, String alias) { - return databaseService.getTableMeta(flinkJobId, alias); + return tableMetaList(flinkJobId, alias) + .getFirstOptional() + .orElseThrow(TableMetaNotFoundException::new); } @Cacheable("un-updated-version-table") diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java index 8307130..9690f49 100644 --- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java +++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java @@ -3,10 +3,10 @@ package com.test; import club.kingon.sql.builder.SqlBuilder; import club.kingon.sql.builder.entry.Column; import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.db.sql.SqlUtil; -import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig; +import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig; /** * @author lanyuanxiaoyao @@ -16,6 +16,8 @@ public class SqlBuilderTests { public static void main(String[] args) { Long flinkJobId = 100086L; String alias = "hello_world"; + String STATUS_Y = "y"; + String STATUS_N = "n"; System.out.println(SqlUtil.formatSql( SqlBuilder.select( TbAppFlinkJobConfig.ID_A, @@ -24,9 +26,10 @@ public class SqlBuilderTests { TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A, TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A ) - .from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_) - .whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) - .andEq(TbAppFlinkJobConfig.STATUS_A, "y") + .from(TbAppFlinkJobConfig._alias_) + .leftJoin(TbAppYarnJobConfig._alias_) + .onEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A)) + .whereEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y) .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId) .build() )); diff --git a/web/components/cloud-tab.js b/web/components/cloud-tab.js index 30d3142..80a6db9 100644 --- a/web/components/cloud-tab.js +++ b/web/components/cloud-tab.js @@ -40,20 +40,6 @@ function cloudTab() { return { title: 'Cloud', tab: [ - { - type: 'wrapper', - size: 'none', - body: [ - { - type: 'action', - label: 'Eureka', - actionType: 'url', - url: 'http://132.122.116.142:35670', - blank: true, - } - ], - }, - {type: 'divider'}, /*{ type: 'service', silentPolling: true,