fix(web): 移除eureka跳转
This commit is contained in:
@@ -5,20 +5,29 @@ import club.kingon.sql.builder.SqlBuilder;
|
||||
import club.kingon.sql.builder.WhereSqlBuilder;
|
||||
import club.kingon.sql.builder.entry.Alias;
|
||||
import club.kingon.sql.builder.entry.Column;
|
||||
import cn.hutool.core.collection.IterUtil;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSource;
|
||||
import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTable;
|
||||
import com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.DataSourceTableField;
|
||||
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
|
||||
import com.eshore.odcp.hudi.connector.entity.SyncState;
|
||||
import com.eshore.odcp.hudi.connector.entity.TableMeta;
|
||||
import com.eshore.odcp.hudi.connector.exception.ConfigException;
|
||||
import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException;
|
||||
import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException;
|
||||
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
|
||||
import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException;
|
||||
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
|
||||
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.function.Function;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
@@ -46,13 +55,11 @@ public class InfoService {
|
||||
private static final String COUNT = "count(*)";
|
||||
private static final String STATUS_Y = "y";
|
||||
private static final String STATUS_N = "n";
|
||||
private final DatabaseService databaseService;
|
||||
private final JdbcTemplate mysqlJdbcTemplate;
|
||||
private final TransactionTemplate mysqlTransactionTemplate;
|
||||
private final SQLLoggerProvider.SQLLogger sqlLogger;
|
||||
|
||||
public InfoService(DatabaseService databaseService, JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) {
|
||||
this.databaseService = databaseService;
|
||||
public InfoService(JdbcTemplate mysqlJdbcTemplate, TransactionTemplate mysqlTransactionTemplate, SQLLoggerProvider.SQLLogger sqlLogger) {
|
||||
this.mysqlJdbcTemplate = mysqlJdbcTemplate;
|
||||
this.mysqlTransactionTemplate = mysqlTransactionTemplate;
|
||||
this.sqlLogger = sqlLogger;
|
||||
@@ -78,6 +85,19 @@ public class InfoService {
|
||||
return StrUtil.format("{}.{}", table.getAlias(), column);
|
||||
}
|
||||
|
||||
private static void checkMoreThanOne(String fieldName, Iterable<?> iterable) throws ConfigException {
|
||||
ConfigException.check(fieldName + " cannot be more than 1", () -> IterUtil.size(iterable) > 1);
|
||||
}
|
||||
|
||||
private static void checkEmpty(String fieldName, Iterable<?> iterable) throws ConfigException {
|
||||
ConfigException.check(fieldName + " cannot be empty", () -> IterUtil.isEmpty(iterable));
|
||||
}
|
||||
|
||||
private static void checkEmptyOrMoreThanOne(String fieldName, Iterable<?> iterable) throws ConfigException {
|
||||
checkEmpty(fieldName, iterable);
|
||||
checkMoreThanOne(fieldName, iterable);
|
||||
}
|
||||
|
||||
@Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias")
|
||||
@Retryable(Throwable.class)
|
||||
public SyncState syncState(Long flinkJobId, String alias) {
|
||||
@@ -262,10 +282,10 @@ public class InfoService {
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<JobAndMetas> jobAndMetas() {
|
||||
return flinkJobs()
|
||||
.collect(job -> new JobAndMetas(job, databaseService.findTableMeta(job.getId())));
|
||||
.collect(job -> new JobAndMetas(job, tableMetaList(job.getId())));
|
||||
}
|
||||
|
||||
private ImmutableList<FlinkJob> flinkJobs(Long flinkJobId) {
|
||||
private ImmutableList<FlinkJob> flinkJobList(Long flinkJobId) {
|
||||
return Lists.immutable.ofAll(
|
||||
mysqlJdbcTemplate.query(
|
||||
SqlBuilder.select(
|
||||
@@ -306,33 +326,406 @@ public class InfoService {
|
||||
@Cacheable(value = "flink-jobs", sync = true)
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<FlinkJob> flinkJobs() {
|
||||
return flinkJobs(null);
|
||||
return flinkJobList(null);
|
||||
}
|
||||
|
||||
@Cacheable(value = "flink-jobs", sync = true, key = "#flinkJobId")
|
||||
@Retryable(Throwable.class)
|
||||
public FlinkJob flinkJob(Long flinkJobId) {
|
||||
return flinkJobs(flinkJobId)
|
||||
return flinkJobList(flinkJobId)
|
||||
.getFirstOptional()
|
||||
.orElseThrow(FlinkJobNotFoundException::new);
|
||||
.orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId));
|
||||
}
|
||||
|
||||
private ImmutableList<TableMeta> tableMetaList(Long flinkJobId) {
|
||||
return tableMetaList(flinkJobId, null);
|
||||
}
|
||||
|
||||
private ImmutableList<TableMeta> tableMetaList(Long flinkJobId, String aliasText) {
|
||||
return Lists.immutable.ofAll(
|
||||
mysqlJdbcTemplate.query(
|
||||
SqlBuilder.select(
|
||||
DataSource.DS_NAME_A,
|
||||
DataSource.SCHEMA_NAME_A,
|
||||
DataSourceTable.TABLE_NAME_A,
|
||||
DataSourceTable.TABLE_TYPE_A,
|
||||
DataSourceTableField.FIELD_NAME_A,
|
||||
DataSourceTableField.FIELD_SEQ_A,
|
||||
DataSourceTableField.FIELD_TYPE_A,
|
||||
DataSourceTableField.PRIMARY_KEY_A,
|
||||
DataSourceTableField.PARTITION_KEY_A,
|
||||
DataSourceTableField.LENGTH_A,
|
||||
TbAppCollectTableInfo.TGT_DB_A,
|
||||
TbAppCollectTableInfo.TGT_TABLE_A,
|
||||
TbAppCollectTableInfo.TGT_TABLE_TYPE_A,
|
||||
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
|
||||
TbAppHudiJobConfig.WRITE_TASKS_A,
|
||||
TbAppHudiJobConfig.WRITE_OPERATION_A,
|
||||
TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A,
|
||||
TbAppHudiJobConfig.WRITE_BATCH_SIZE_A,
|
||||
TbAppHudiJobConfig.WRITE_RATE_LIMIT_A,
|
||||
TbAppCollectTableInfo.BUCKET_NUMBER_A,
|
||||
TbAppHudiJobConfig.COMPACTION_STRATEGY_A,
|
||||
TbAppHudiJobConfig.COMPACTION_TASKS_A,
|
||||
TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A,
|
||||
TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A,
|
||||
TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A,
|
||||
TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A,
|
||||
TbAppHudiJobConfig.CONFIGS_A,
|
||||
TbAppCollectTableInfo.FILTER_FIELD_A,
|
||||
TbAppCollectTableInfo.FILTER_VALUES_A,
|
||||
TbAppCollectTableInfo.FILTER_TYPE_A,
|
||||
TbAppCollectTableInfo.SRC_TOPIC_A,
|
||||
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
|
||||
Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"),
|
||||
Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"),
|
||||
Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"),
|
||||
Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"),
|
||||
TbAppCollectTableInfo.PARTITION_FIELD_A,
|
||||
TbAppHudiSyncState.MESSAGE_ID_A,
|
||||
TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
|
||||
TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A,
|
||||
TbAppGlobalConfig.METRIC_API_URL_A,
|
||||
TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A,
|
||||
TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A,
|
||||
TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A,
|
||||
TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A,
|
||||
Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"),
|
||||
Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"),
|
||||
TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A,
|
||||
TbAppHudiJobConfig.SOURCE_TASKS_A,
|
||||
TbAppCollectTableInfo.ALIAS_A,
|
||||
DataSource.CONNECTION_A,
|
||||
TbAppCollectTableInfo.PRIORITY_A,
|
||||
DataSource.DS_TYPE_A,
|
||||
TbAppHudiJobConfig.KEEP_FILE_VERSION_A,
|
||||
TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A,
|
||||
TbAppCollectTableInfo.TAGS_A,
|
||||
TbAppGlobalConfig.ZK_URL_A,
|
||||
TbAppCollectTableInfo.VERSION_A,
|
||||
DataSourceTableField.SCALE_A
|
||||
)
|
||||
.from(
|
||||
DataSource._alias_,
|
||||
DataSourceTable._alias_,
|
||||
DataSourceTableField._alias_,
|
||||
TbAppFlinkJobConfig._alias_,
|
||||
TbAppHudiJobConfig._alias_,
|
||||
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"),
|
||||
Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"),
|
||||
TbAppGlobalConfig._alias_,
|
||||
TbAppCollectTableInfo._alias_,
|
||||
TbAppHudiSyncState._alias_
|
||||
)
|
||||
.whereEq(DataSource.DS_ROLE_A, "src")
|
||||
.andEq(DataSource.DS_STATE_A, STATUS_Y)
|
||||
.andEq(DataSource.RECORD_STATE_A, STATUS_Y)
|
||||
.andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A))
|
||||
.andEq(DataSourceTable.RECORD_STATE_A, STATUS_Y)
|
||||
.andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A))
|
||||
.andEq(DataSourceTableField.RECORD_STATE_A, STATUS_Y)
|
||||
.andIn(DataSource.DS_TYPE_A, "udal", "telepg")
|
||||
.andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
|
||||
.andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
|
||||
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
|
||||
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
|
||||
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
|
||||
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id"))
|
||||
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id"))
|
||||
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
|
||||
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
|
||||
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
|
||||
.andEq(StrUtil.isNotBlank(aliasText), TbAppCollectTableInfo.ALIAS_A, aliasText)
|
||||
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
|
||||
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
|
||||
.andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
|
||||
.andEq("tayjc_sync.status", STATUS_Y)
|
||||
.andEq("tayjc_compaction.status", STATUS_Y)
|
||||
.orderBy(DataSourceTableField.FIELD_SEQ_A)
|
||||
.build(),
|
||||
(rs, row) -> TableMeta.RowMeta.builder()
|
||||
.dsName(rs.getString(1))
|
||||
.schemaName(rs.getString(2))
|
||||
.tableName(rs.getString(3))
|
||||
.tableType(rs.getString(4))
|
||||
.fieldName(rs.getString(5))
|
||||
.fieldSeq(rs.getInt(6))
|
||||
.fieldType(rs.getString(7))
|
||||
.primaryKey(rs.getString(8))
|
||||
.partitionKey(rs.getString(9))
|
||||
.length(rs.getLong(10))
|
||||
.tgtDb(rs.getString(11))
|
||||
.tgtTable(rs.getString(12))
|
||||
.tgtTableType(rs.getString(13))
|
||||
.tgtHdfsPath(rs.getString(14))
|
||||
.writeTasks(rs.getInt(15))
|
||||
.writeOperation(rs.getString(16))
|
||||
.writeTaskMaxMemory(rs.getInt(17))
|
||||
.writeBatchSize(rs.getInt(18))
|
||||
.writeRateLimit(rs.getInt(19))
|
||||
.bucketIndexNumber(rs.getInt(20))
|
||||
.compactionStrategy(rs.getString(21))
|
||||
.compactionTasks(rs.getInt(22))
|
||||
.compactionDeltaCommits(rs.getInt(23))
|
||||
.compactionDeltaSeconds(rs.getInt(24))
|
||||
.compactionAsyncEnabled(rs.getString(25))
|
||||
.compactionMaxMemory(rs.getInt(26))
|
||||
.configs(rs.getString(27))
|
||||
.filterField(rs.getString(28))
|
||||
.filterValues(rs.getString(29))
|
||||
.filterType(rs.getString(30))
|
||||
.topic(rs.getString(31))
|
||||
.pulsarAddress(rs.getString(32))
|
||||
.syncJobManagerMemory(rs.getInt(33))
|
||||
.syncTaskManagerMemory(rs.getInt(34))
|
||||
.compactionJobManagerMemory(rs.getInt(35))
|
||||
.compactionTaskManagerMemory(rs.getInt(36))
|
||||
.partitionField(rs.getString(37))
|
||||
.messageId(rs.getString(38))
|
||||
.metricPublishUrl(rs.getString(39))
|
||||
.metricPrometheusUrl(rs.getString(40))
|
||||
.metricApiUrl(rs.getString(41))
|
||||
.metricPublishDelay(rs.getInt(42))
|
||||
.metricPublishPeriod(rs.getInt(43))
|
||||
.metricPublishTimeout(rs.getInt(44))
|
||||
.metricPublishBatch(rs.getInt(45))
|
||||
.jobId(rs.getLong(46))
|
||||
.jobName(rs.getString(47))
|
||||
.checkpointRootPath(rs.getString(48))
|
||||
.sourceTasks(rs.getInt(49))
|
||||
.alias(rs.getString(50))
|
||||
.connection(rs.getString(51))
|
||||
.priority(rs.getInt(52))
|
||||
.sourceType(rs.getString(53))
|
||||
.keepFileVersion(rs.getInt(54))
|
||||
.keepCommitVersion(rs.getInt(55))
|
||||
.tags(rs.getString(56))
|
||||
.zookeeperUrl(rs.getString(57))
|
||||
.version(rs.getInt(58))
|
||||
.scala(rs.getInt(59))
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.asParallel(ExecutorProvider.EXECUTORS, 5)
|
||||
.groupBy(TableMeta.RowMeta::getAlias)
|
||||
.multiValuesView()
|
||||
.collect(aliasRowMetas -> {
|
||||
try {
|
||||
ImmutableList<TableMeta.RowMeta> rows = aliasRowMetas
|
||||
.toSortedListBy(TableMeta.RowMeta::getFieldSeq)
|
||||
.toImmutable();
|
||||
|
||||
ImmutableList<String> aliasList = rows.collect(TableMeta.RowMeta::getAlias).distinct();
|
||||
checkEmptyOrMoreThanOne("alias", aliasList);
|
||||
String alias = aliasList.get(0);
|
||||
|
||||
ImmutableList<String> sourceTypeList = rows.collect(TableMeta.RowMeta::getSourceType).distinct();
|
||||
checkEmptyOrMoreThanOne("source_type", sourceTypeList);
|
||||
String sourceTypeText = sourceTypeList.get(0).toUpperCase();
|
||||
TableMeta.SourceType sourceType;
|
||||
try {
|
||||
sourceType = TableMeta.SourceType.valueOf(sourceTypeText);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new Exception("Cannot parse source type " + sourceTypeText);
|
||||
}
|
||||
|
||||
ImmutableList<String> dsNames = rows.collect(TableMeta.RowMeta::getDsName).distinct();
|
||||
checkEmptyOrMoreThanOne("ds_name", dsNames);
|
||||
String dataSource = dsNames.get(0);
|
||||
|
||||
ImmutableList<String> schemaNames = rows.collect(TableMeta.RowMeta::getSchemaName).distinct();
|
||||
checkEmptyOrMoreThanOne("schema_name", schemaNames);
|
||||
String schema = schemaNames.get(0);
|
||||
|
||||
ImmutableList<String> tableNames = rows.collect(TableMeta.RowMeta::getTableName).distinct();
|
||||
// 每次只能获取 1 张表的元信息
|
||||
checkEmptyOrMoreThanOne("table_name", tableNames);
|
||||
String table = tableNames.get(0);
|
||||
|
||||
ImmutableList<String> tableTypes = rows.collect(TableMeta.RowMeta::getTableType).distinct();
|
||||
checkEmptyOrMoreThanOne("table_type", tableTypes);
|
||||
String type = tableTypes.get(0);
|
||||
|
||||
ImmutableList<String> filterFields = rows.collect(TableMeta.RowMeta::getFilterField).distinct();
|
||||
checkEmptyOrMoreThanOne("filter_field", filterFields);
|
||||
String filterField = filterFields.get(0);
|
||||
|
||||
ImmutableList<String> filterValueList = rows.collect(TableMeta.RowMeta::getFilterValues).distinct();
|
||||
checkEmptyOrMoreThanOne("filter_values", filterValueList);
|
||||
String filterValuesText = filterValueList.get(0);
|
||||
ImmutableList<String> filterValues = StrUtil.isBlank(filterValuesText)
|
||||
? Lists.immutable.empty()
|
||||
: Lists.immutable.of(filterValuesText.split(","));
|
||||
ImmutableList<String> filterTypes = rows.collect(TableMeta.RowMeta::getFilterType).distinct();
|
||||
checkEmptyOrMoreThanOne("filter_field", filterFields);
|
||||
TableMeta.FilterType filterType;
|
||||
try {
|
||||
filterType = TableMeta.FilterType.valueOf(filterTypes.get(0));
|
||||
} catch (IllegalArgumentException e) {
|
||||
filterType = TableMeta.FilterType.NONE;
|
||||
}
|
||||
|
||||
ImmutableList<String> topics = rows.collect(TableMeta.RowMeta::getTopic).distinct();
|
||||
checkEmptyOrMoreThanOne("topic", topics);
|
||||
String topic = topics.get(0);
|
||||
|
||||
ImmutableList<String> pulsarAddresses = rows.collect(TableMeta.RowMeta::getPulsarAddress).distinct();
|
||||
checkEmptyOrMoreThanOne("pulsar address", pulsarAddresses);
|
||||
String pulsarAddress = pulsarAddresses.get(0);
|
||||
|
||||
ImmutableList<Integer> priorities = rows.collect(TableMeta.RowMeta::getPriority).distinct();
|
||||
checkEmptyOrMoreThanOne("priority", priorities);
|
||||
Integer priority = priorities.get(0);
|
||||
|
||||
ImmutableList<String> tagTexts = rows.collect(TableMeta.RowMeta::getTags).distinct();
|
||||
checkEmptyOrMoreThanOne("tags", tagTexts);
|
||||
String tagText = ObjectUtil.isNull(tagTexts.get(0)) ? "" : tagTexts.get(0);
|
||||
ImmutableList<String> tags = Lists.immutable.of(tagText.split(","));
|
||||
|
||||
ImmutableList<Integer> versions = rows.collect(TableMeta.RowMeta::getVersion).distinct();
|
||||
checkEmptyOrMoreThanOne("version", versions);
|
||||
Integer version = versions.get(0);
|
||||
|
||||
// 获取 Hudi 配置, 因为查出来同一张表的配置都相同, 所以直接取第一条即可
|
||||
TableMeta.RowMeta example = rows.get(0);
|
||||
TableMeta.HudiMeta hudiMeta = TableMeta.HudiMeta.builder()
|
||||
.targetDataSource(example.getTgtDb())
|
||||
.targetTable(example.getTgtTable())
|
||||
.targetTableType(example.getTgtTableType())
|
||||
.targetHdfsPath(example.getTgtHdfsPath())
|
||||
.sourceTasks(example.getSourceTasks())
|
||||
.writeTasks(example.getWriteTasks())
|
||||
.writeOperation(example.getWriteOperation())
|
||||
.writeTaskMaxMemory(example.getWriteTaskMaxMemory())
|
||||
.writeBatchSize(example.getWriteBatchSize())
|
||||
.writeRateLimit(example.getWriteRateLimit())
|
||||
.bucketIndexNumber(example.getBucketIndexNumber())
|
||||
.compactionStrategy(example.getCompactionStrategy())
|
||||
.compactionTasks(example.getCompactionTasks())
|
||||
.compactionDeltaCommits(example.getCompactionDeltaCommits())
|
||||
.compactionDeltaSeconds(example.getCompactionDeltaSeconds())
|
||||
.compactionAsyncEnabled(example.getCompactionAsyncEnabled())
|
||||
.compactionMaxMemory(example.getCompactionMaxMemory())
|
||||
.configs(example.getConfigs())
|
||||
.keepFileVersion(example.getKeepFileVersion())
|
||||
.keepCommitVersion(example.getKeepCommitVersion())
|
||||
.build();
|
||||
TableMeta.YarnMeta syncYarnMeta = TableMeta.YarnMeta.builder()
|
||||
.jobManagerMemory(example.getSyncJobManagerMemory())
|
||||
.taskManagerMemory(example.getSyncTaskManagerMemory())
|
||||
.build();
|
||||
TableMeta.YarnMeta compactionYarnMeta = TableMeta.YarnMeta.builder()
|
||||
.jobManagerMemory(example.getCompactionJobManagerMemory())
|
||||
.taskManagerMemory(example.getCompactionTaskManagerMemory())
|
||||
.build();
|
||||
TableMeta.ConfigMeta configMeta = TableMeta.ConfigMeta.builder()
|
||||
.messageId(example.getMessageId())
|
||||
.metricPublishUrl(example.getMetricPublishUrl())
|
||||
.metricPrometheusUrl(example.getMetricPrometheusUrl())
|
||||
.metricApiUrl(example.getMetricApiUrl())
|
||||
.metricPublishDelay(example.getMetricPublishDelay())
|
||||
.metricPublishPeriod(example.getMetricPublishPeriod())
|
||||
.metricPublishTimeout(example.getMetricPublishTimeout())
|
||||
.metricPublishBatch(example.getMetricPublishBatch())
|
||||
.checkpointRootPath(example.getCheckpointRootPath())
|
||||
.zookeeperUrl(example.getZookeeperUrl())
|
||||
.build();
|
||||
TableMeta.JobMeta jobMeta = TableMeta.JobMeta.builder()
|
||||
.id(example.getJobId())
|
||||
.name(example.getJobName())
|
||||
.build();
|
||||
|
||||
TableMeta.ConnectionMeta connectionMeta = null;
|
||||
String connectionText = example.getConnection();
|
||||
if (StrUtil.isNotBlank(connectionText)) {
|
||||
JSONObject connectionObj = JSONUtil.parseObj(connectionText);
|
||||
connectionMeta = TableMeta.ConnectionMeta.builder()
|
||||
.url(connectionObj.getStr("jdbc_url"))
|
||||
.user(connectionObj.getStr("jdbc_user"))
|
||||
.password(connectionObj.getStr("jdbc_password"))
|
||||
.driver(connectionObj.getStr("jdbc_driver"))
|
||||
.build();
|
||||
}
|
||||
|
||||
ImmutableList<String> partitionFields = rows.collect(TableMeta.RowMeta::getPartitionField).distinct();
|
||||
checkEmptyOrMoreThanOne("partition_field", filterFields);
|
||||
String partitionField = partitionFields.get(0);
|
||||
|
||||
List<TableMeta.FieldMeta> primaryKeys = Lists.mutable.empty(),
|
||||
partitionKeys = Lists.mutable.empty(),
|
||||
fieldMetaList = Lists.mutable.empty();
|
||||
for (TableMeta.RowMeta rowMeta : rows) {
|
||||
boolean isPrimaryKey = StrUtil.equals(STATUS_Y, rowMeta.getPrimaryKey());
|
||||
boolean isPartitionKey = StrUtil.equals(STATUS_Y, rowMeta.getPartitionKey());
|
||||
TableMeta.FieldMeta fieldMeta = TableMeta.FieldMeta.builder()
|
||||
.name(rowMeta.getFieldName().toUpperCase(Locale.ROOT))
|
||||
.sequence(rowMeta.getFieldSeq())
|
||||
.type(rowMeta.getFieldType())
|
||||
.isPrimaryKey(isPrimaryKey)
|
||||
.partitionKey(isPartitionKey)
|
||||
.length(rowMeta.getLength())
|
||||
.scala(rowMeta.getScala())
|
||||
.build();
|
||||
if (isPrimaryKey) {
|
||||
primaryKeys.add(fieldMeta);
|
||||
}
|
||||
if (isPartitionKey) {
|
||||
partitionKeys.add(fieldMeta);
|
||||
}
|
||||
fieldMetaList.add(fieldMeta);
|
||||
}
|
||||
return TableMeta.builder()
|
||||
.alias(alias)
|
||||
.source(dataSource)
|
||||
.schema(schema)
|
||||
.table(table)
|
||||
.type(type)
|
||||
.primaryKeys(primaryKeys)
|
||||
.partitionKeys(partitionKeys)
|
||||
.hudi(hudiMeta)
|
||||
.fields(fieldMetaList)
|
||||
.filterField(filterField)
|
||||
.filterValues(filterValues.toList())
|
||||
.filterType(filterType)
|
||||
.topic(topic)
|
||||
.pulsarAddress(pulsarAddress)
|
||||
.syncYarn(syncYarnMeta)
|
||||
.compactionYarn(compactionYarnMeta)
|
||||
.partitionField(partitionField)
|
||||
.config(configMeta)
|
||||
.job(jobMeta)
|
||||
.connection(connectionMeta)
|
||||
.priority(priority)
|
||||
.sourceType(sourceType)
|
||||
.tags(tags.toList())
|
||||
.version(version)
|
||||
.build();
|
||||
} catch (Throwable throwable) {
|
||||
throw new RuntimeException(throwable);
|
||||
}
|
||||
})
|
||||
.toList()
|
||||
.toImmutable();
|
||||
}
|
||||
|
||||
@Cacheable(value = "table-metas", sync = true)
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<TableMeta> tableMetas() {
|
||||
return flinkJobs().flatCollect(job -> databaseService.findTableMeta(job.getId()));
|
||||
return flinkJobs().flatCollect(job -> tableMetaList(job.getId()));
|
||||
}
|
||||
|
||||
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId")
|
||||
@Retryable(Throwable.class)
|
||||
public ImmutableList<TableMeta> tableMetas(Long flinkJobId) {
|
||||
return databaseService.findTableMeta(flinkJobId);
|
||||
return tableMetaList(flinkJobId);
|
||||
}
|
||||
|
||||
@Cacheable(value = "table-metas", sync = true, key = "#flinkJobId.toString()+#alias")
|
||||
@Retryable(Throwable.class)
|
||||
public TableMeta tableMeta(Long flinkJobId, String alias) {
|
||||
return databaseService.getTableMeta(flinkJobId, alias);
|
||||
return tableMetaList(flinkJobId, alias)
|
||||
.getFirstOptional()
|
||||
.orElseThrow(TableMetaNotFoundException::new);
|
||||
}
|
||||
|
||||
@Cacheable("un-updated-version-table")
|
||||
|
||||
@@ -3,10 +3,10 @@ package com.test;
|
||||
import club.kingon.sql.builder.SqlBuilder;
|
||||
import club.kingon.sql.builder.entry.Column;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.db.sql.SqlUtil;
|
||||
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
|
||||
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
@@ -16,6 +16,8 @@ public class SqlBuilderTests {
|
||||
public static void main(String[] args) {
|
||||
Long flinkJobId = 100086L;
|
||||
String alias = "hello_world";
|
||||
String STATUS_Y = "y";
|
||||
String STATUS_N = "n";
|
||||
System.out.println(SqlUtil.formatSql(
|
||||
SqlBuilder.select(
|
||||
TbAppFlinkJobConfig.ID_A,
|
||||
@@ -24,9 +26,10 @@ public class SqlBuilderTests {
|
||||
TbAppYarnJobConfig.JOB_MANAGER_MEMORY_A,
|
||||
TbAppYarnJobConfig.TASK_MANAGER_MEMORY_A
|
||||
)
|
||||
.from(TbAppFlinkJobConfig._alias_, TbAppYarnJobConfig._alias_)
|
||||
.whereEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A))
|
||||
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
|
||||
.from(TbAppFlinkJobConfig._alias_)
|
||||
.leftJoin(TbAppYarnJobConfig._alias_)
|
||||
.onEq(TbAppFlinkJobConfig.ONE_IN_ONE_YARN_JOB_ID_A, Column.as(TbAppYarnJobConfig.ID_A))
|
||||
.whereEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
|
||||
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
|
||||
.build()
|
||||
));
|
||||
|
||||
Reference in New Issue
Block a user