refactor(info-query): 转换旧逻辑到新逻辑

This commit is contained in:
2023-12-27 18:17:38 +08:00
parent 75e01891c3
commit c2cdfac269
2 changed files with 71 additions and 109 deletions

View File

@@ -11,12 +11,14 @@ import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.exception.SyncStateNotFoundException;
import com.eshore.odcp.hudi.connector.utils.database.DatabaseService;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import java.sql.Timestamp;
import java.util.List;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
@@ -78,7 +80,55 @@ public class InfoService {
@Cacheable(value = "sync-state", sync = true, key = "#flinkJobId.toString()+#alias")
@Retryable(Throwable.class)
public SyncState syncState(Long flinkJobId, String alias) {
return databaseService.getSyncState(flinkJobId, alias);
return Lists.immutable.ofAll(
mysqlJdbcTemplate.query(
SqlBuilder.select(
TbAppFlinkJobConfig.ID_A,
TbAppCollectTableInfo.ALIAS_A,
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppHudiSyncState.SOURCE_START_TIME_A,
TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A,
TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A,
TbAppHudiSyncState.SOURCE_OP_TIME_A,
TbAppHudiSyncState.COMPACTION_START_TIME_A,
TbAppHudiSyncState.COMPACTION_FINISH_TIME_A,
TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A,
TbAppHudiSyncState.COMPACTION_STATUS_A,
TbAppHudiSyncState.COMPACTION_STATUS_TIME_A,
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A
)
.from(
TbAppFlinkJobConfig._alias_,
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.build(),
(rs, row) -> {
Function<Timestamp, Long> dateConvertor = timestamp -> timestamp == null ? 0 : timestamp.getTime();
return SyncState.builder()
.flinkJobId(rs.getLong(1))
.alias(rs.getString(2))
.messageId(rs.getString(3))
.sourceStartTime(dateConvertor.apply(rs.getTimestamp(4)))
.sourceCheckpointTime(dateConvertor.apply(rs.getTimestamp(5)))
.sourcePublishTime(dateConvertor.apply(rs.getTimestamp(6)))
.sourceOperationTime(dateConvertor.apply(rs.getTimestamp(7)))
.compactionStartTime(dateConvertor.apply(rs.getTimestamp(8)))
.compactionFinishTime(dateConvertor.apply(rs.getTimestamp(9)))
.compactionApplicationId(rs.getString(10))
.compactionStatus(rs.getString(11))
.compactionStatusTime(dateConvertor.apply(rs.getTimestamp(12)))
.compactionLatestOperationTime(dateConvertor.apply(rs.getTimestamp(13)))
.build();
}
)
).getFirstOptional().orElseThrow(SyncStateNotFoundException::new);
}
private SqlBuilder generateJobIdAndAliasCriteria(

View File

@@ -14,123 +14,35 @@ import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
*/
public class SqlBuilderTests {
public static void main(String[] args) {
/*System.out.println(SqlFormatter.format(
Long flinkJobId = 100086L;
String alias = "hello_world";
System.out.println(SqlUtil.formatSql(
SqlBuilder.select(
DataSource.DS_NAME_A,
DataSource.SCHEMA_NAME_A,
DataSourceTable.TABLE_NAME_A,
DataSourceTable.TABLE_TYPE_A,
DataSourceTableField.FIELD_NAME_A,
DataSourceTableField.FIELD_SEQ_A,
DataSourceTableField.FIELD_TYPE_A,
DataSourceTableField.PRIMARY_KEY_A,
DataSourceTableField.PARTITION_KEY_A,
DataSourceTableField.LENGTH_A,
TbAppCollectTableInfo.TGT_DB_A,
TbAppCollectTableInfo.TGT_TABLE_A,
TbAppCollectTableInfo.TGT_TABLE_TYPE_A,
TbAppCollectTableInfo.TGT_HDFS_PATH_A,
TbAppHudiJobConfig.WRITE_TASKS_A,
TbAppHudiJobConfig.WRITE_OPERATION_A,
TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A,
TbAppHudiJobConfig.WRITE_BATCH_SIZE_A,
TbAppHudiJobConfig.WRITE_RATE_LIMIT_A,
TbAppCollectTableInfo.BUCKET_NUMBER_A,
TbAppHudiJobConfig.COMPACTION_STRATEGY_A,
TbAppHudiJobConfig.COMPACTION_TASKS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A,
TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A,
TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A,
TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A,
TbAppHudiJobConfig.CONFIGS_A,
TbAppCollectTableInfo.FILTER_FIELD_A,
TbAppCollectTableInfo.FILTER_VALUES_A,
TbAppCollectTableInfo.FILTER_TYPE_A,
TbAppCollectTableInfo.SRC_TOPIC_A,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
Alias.of("sync_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "sync_job_manager_memory"),
Alias.of("sync_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "sync_task_manager_memory"),
Alias.of("compaction_config." + TbAppYarnJobConfig.JOB_MANAGER_MEMORY_O, "compaction_job_manager_memory"),
Alias.of("compaction_config." + TbAppYarnJobConfig.TASK_MANAGER_MEMORY_O, "compaction_task_manger_momory"),
TbAppCollectTableInfo.PARTITION_FIELD_A,
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A,
TbAppGlobalConfig.METRIC_API_URL_A,
TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A,
TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A,
TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A,
TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A,
Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"),
Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"),
TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A,
TbAppHudiJobConfig.SOURCE_TASKS_A,
TbAppFlinkJobConfig.ID_A,
TbAppCollectTableInfo.ALIAS_A,
DataSource.CONNECTION_A,
TbAppCollectTableInfo.PRIORITY_A,
DataSource.DS_TYPE_A,
TbAppHudiJobConfig.KEEP_FILE_VERSION_A,
TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A,
TbAppCollectTableInfo.TAGS_A,
TbAppGlobalConfig.ZK_URL_A
TbAppHudiSyncState.MESSAGE_ID_A,
TbAppHudiSyncState.SOURCE_START_TIME_A,
TbAppHudiSyncState.SOURCE_CHECKPOINT_TIME_A,
TbAppHudiSyncState.SOURCE_PUBLISH_TIME_A,
TbAppHudiSyncState.SOURCE_OP_TIME_A,
TbAppHudiSyncState.COMPACTION_START_TIME_A,
TbAppHudiSyncState.COMPACTION_FINISH_TIME_A,
TbAppHudiSyncState.COMPACTION_APPLICATION_ID_A,
TbAppHudiSyncState.COMPACTION_STATUS_A,
TbAppHudiSyncState.COMPACTION_STATUS_TIME_A,
TbAppHudiSyncState.COMPACTION_LATEST_OP_TS_A
)
.from(
DataSource._alias_,
DataSourceTable._alias_,
DataSourceTableField._alias_,
TbAppFlinkJobConfig._alias_,
TbAppHudiJobConfig._alias_,
Alias.of(TbAppYarnJobConfig._origin_, "sync_config"),
Alias.of(TbAppYarnJobConfig._origin_, "compaction_config"),
TbAppGlobalConfig._alias_,
TbAppCollectTableInfo._alias_
TbAppCollectTableInfo._alias_,
TbAppHudiSyncState._alias_
)
.leftJoin(TbAppHudiSyncState._alias_)
.onEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppCollectTableInfo.FLINK_JOB_ID_A, TbAppCollectTableInfo.ALIAS_A)))
.whereEq(DataSource.DS_ROLE_A, "src")
.andEq(DataSource.DS_STATE_A, "y")
.andEq(DataSource.RECORD_STATE_A, "y")
.andEq(DataSourceTable.DS_ID_A, Column.as(DataSource.DS_ID_A))
.andEq(DataSourceTable.RECORD_STATE_A, "y")
.andEq(DataSourceTableField.TABLE_ID_A, Column.as(DataSourceTable.TABLE_ID_A))
.andEq(DataSourceTableField.RECORD_STATE_A, "y")
.andIn(DataSource.DS_TYPE_A, "udal", "telepg")
.andEq(DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
.andEq(DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
.andEq(DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
.andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("sync_config." + TbAppYarnJobConfig.ID_O))
.andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("compaction_config." + TbAppYarnJobConfig.ID_O))
.andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
.andEq(TbAppFlinkJobConfig.ID_A, 1542097984132706304L)
.andEq(TbAppCollectTableInfo.ALIAS_A, "crm_cfguse_channel")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppHudiJobConfig.STATUS_A, "y")
.andEq("sync_config." + TbAppYarnJobConfig.STATUS_O, "y")
.andEq("compaction_config." + TbAppYarnJobConfig.STATUS_O, "y")
.orderBy(DataSourceTableField.FIELD_SEQ_A)
.build()
));*/
System.out.println(SqlUtil.formatSql(
SqlBuilder.select("*")
.from(TbAppFlinkJobConfig._alias_, TbAppCollectTableInfo._alias_, TbAppCollectTableVersion._alias_)
.whereEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableInfo.FLINK_JOB_ID_A))
.andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.andEq(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018")
/*.join(TbAppCollectTableVersion._alias_)
.onEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppCollectTableVersion.FLINK_JOB_ID_A))
.andEq(TbAppCollectTableInfo.ALIAS_A, Column.as(TbAppCollectTableVersion.ALIAS_A))
.whereLt(TbAppCollectTableInfo.PRIORITY_A, 10000)
.andEq(TbAppCollectTableInfo.SCHEDULE_ID_A, false)
.andEq(TbAppCollectTableVersion.VERSION_A, "2018")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")*/
.andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
.build()
));
}