feat(info-query): 切换旧数据库操作到新服务

This commit is contained in:
2024-01-03 16:10:13 +08:00
parent 829813df38
commit 9aec08c43a
10 changed files with 348 additions and 28 deletions

View File

@@ -36,4 +36,13 @@ public class FlinkJobController {
public FlinkJob detail(@RequestParam("flink_job_id") Long flinkJobId) {
return flinkJobService.flinkJob(flinkJobId);
}
@GetMapping("/flink_job/save")
public void saveFlinkJob(
@RequestParam("id") Long id,
@RequestParam("name") String name,
@RequestParam("run_mode") String runMode
) {
flinkJobService.saveFlinkJob(id, name, runMode);
}
}

View File

@@ -30,4 +30,13 @@ public class SyncStateController {
public SyncState syncState(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return syncStateService.syncState(flinkJobId, alias);
}
@GetMapping("/sync_state/save_compaction_id")
public void saveCompactionId(
@RequestParam("flink_job_id") Long flinkJobId,
@RequestParam("alias") String alias,
@RequestParam("application_id") String applicationId
) {
syncStateService.saveCompactionApplicationId(flinkJobId, alias, applicationId);
}
}

View File

@@ -3,14 +3,12 @@ package com.lanyuanxiaoyao.service.info.controller;
import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableMetaAdd;
import com.lanyuanxiaoyao.service.info.service.TableMetaService;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
/**
* Table Meta
@@ -50,15 +48,25 @@ public class TableMetaController {
}
@GetMapping("/exists_table")
public Boolean existsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
public Boolean existsTable(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return tableMetaService.existsTable(flinkJobId, alias);
}
@GetMapping("/non_exists_table")
public Boolean nonExistsTable(@RequestParam(value = "flink_job_id") Long flinkJobId, @RequestParam(value = "alias") String alias) {
public Boolean nonExistsTable(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
return !tableMetaService.existsTable(flinkJobId, alias);
}
@GetMapping("/exists_table_by_hdfs")
public Boolean existsTableByHdfs(@RequestParam("hdfs") String hdfs) {
return tableMetaService.existsTableByHdfs(hdfs);
}
@GetMapping("/non_exists_table_by_hdfs")
public Boolean nonExistsTableByHdfs(@RequestParam("hdfs") String hdfs) {
return !tableMetaService.existsTableByHdfs(hdfs);
}
@GetMapping("/table_count")
public Long tableCount() {
return tableMetaService.tableCount();
@@ -88,4 +96,14 @@ public class TableMetaController {
public Long hiveFocusCount() {
return tableMetaService.hiveFocusCount();
}
@GetMapping("/disable_table")
public void disableTable(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) {
tableMetaService.disableTable(flinkJobId, alias);
}
@PostMapping("/table_meta/save")
public void saveTableMeta(@RequestBody TableMetaAdd addon) {
tableMetaService.saveTableMeta(addon);
}
}

View File

@@ -13,6 +13,10 @@ public class BaseService {
protected static final String COUNT = "count(*)";
protected static final String STATUS_Y = "y";
protected static final String STATUS_N = "n";
/**
* 占位符
*/
protected static final String Q = "?";
protected static String column(Alias table, String column) {
return StrUtil.format("{}.{}", table.getAlias(), column);

View File

@@ -6,7 +6,6 @@ import cn.hutool.core.util.ObjectUtil;
import com.eshore.odcp.hudi.connector.entity.FlinkJob;
import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.exception.FlinkJobNotFoundException;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -16,7 +15,6 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppFlinkJobConfig;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.TbAppYarnJobConfig;
@@ -88,4 +86,22 @@ public class FlinkJobService extends BaseService {
.getFirstOptional()
.orElseThrow(() -> new FlinkJobNotFoundException(flinkJobId));
}
@Retryable(Throwable.class)
public void saveFlinkJob(Long id, String name, String runMode) {
mysqlJdbcTemplate.update(
SqlBuilder
.insertInto(
TbAppFlinkJobConfig._origin_,
TbAppFlinkJobConfig.ID_O,
TbAppFlinkJobConfig.NAME_O,
TbAppFlinkJobConfig.RUN_MODE_O,
TbAppFlinkJobConfig.STATUS_O,
TbAppFlinkJobConfig.COMMENT_O
)
.values()
.addValue(Q, Q, Q, Q, Q)
.precompileSql(),
id, name, runMode, STATUS_Y, 3);
}
}

View File

@@ -88,4 +88,17 @@ public class SyncStateService extends BaseService {
)
).getFirstOptional().orElseThrow(SyncStateNotFoundException::new);
}
public void saveCompactionApplicationId(Long flinkJobId, String alias, String applicationId) {
mysqlJdbcTemplate.update(
SqlBuilder
.insertInto(TbAppHudiSyncState._origin_, TbAppHudiSyncState.ID_O, TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O)
.values()
.addValue(Q, Q)
.onDuplicateKeyUpdateColumn(TbAppHudiSyncState.COMPACTION_APPLICATION_ID_O)
.precompileSql(),
StrUtil.format("{}-{}", flinkJobId, alias),
applicationId
);
}
}

View File

@@ -13,7 +13,7 @@ import com.eshore.odcp.hudi.connector.exception.ConfigException;
import com.eshore.odcp.hudi.connector.exception.TableMetaNotFoundException;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.info.SimpleTableMeta;
import com.lanyuanxiaoyao.service.info.configuration.SQLLoggerProvider;
import com.lanyuanxiaoyao.service.configuration.entity.info.TableMetaAdd;
import java.util.List;
import java.util.Locale;
import org.eclipse.collections.api.factory.Lists;
@@ -25,7 +25,6 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;
import static com.eshore.odcp.hudi.connector.SQLConstants.HudiCollectBuild.*;
import static com.eshore.odcp.hudi.connector.SQLConstants.IapDatahub.*;
@@ -574,10 +573,97 @@ public class TableMetaService extends BaseService {
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppFlinkJobConfig.ID_A, flinkJobId)
.andEq(TbAppCollectTableInfo.ALIAS_A, alias)
.andEq(TbAppFlinkJobConfig.STATUS_A, "y")
.andEq(TbAppCollectTableInfo.STATUS_A, "y")
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Boolean.class
);
}
@Cacheable(value = "exists-table", sync = true)
@Retryable(Throwable.class)
public Boolean existsTableByHdfs(String hdfs) {
return mysqlJdbcTemplate.queryForObject(
SqlBuilder.select("count(*) > 0")
.from(TbAppCollectTableInfo._alias_, TbAppFlinkJobConfig._alias_)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
.andEq(TbAppCollectTableInfo.TGT_HDFS_PATH_A, hdfs)
.andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
.andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
.build(),
Boolean.class
);
}
@Retryable(Throwable.class)
public void disableTable(Long flinkJobId, String alias) {
mysqlJdbcTemplate.update(
SqlBuilder.update(TbAppCollectTableInfo._origin_)
.set(TbAppCollectTableInfo.STATUS_O, Q)
.whereEq(TbAppCollectTableInfo.FLINK_JOB_ID_O, Q)
.andEq(TbAppCollectTableInfo.ALIAS_O, Q)
.precompileSql(),
STATUS_N,
flinkJobId,
alias
);
}
@Retryable(Throwable.class)
public void saveTableMeta(TableMetaAdd addon) {
mysqlJdbcTemplate.update(
SqlBuilder.insertInto(
TbAppCollectTableInfo._origin_,
TbAppCollectTableInfo.ID_O,
TbAppCollectTableInfo.ALIAS_O,
TbAppCollectTableInfo.FLINK_JOB_ID_O,
TbAppCollectTableInfo.HUDI_JOB_ID_O,
TbAppCollectTableInfo.SYNC_YARN_JOB_ID_O,
TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_O,
TbAppCollectTableInfo.CONFIG_ID_O,
TbAppCollectTableInfo.SRC_DB_O,
TbAppCollectTableInfo.SRC_TYPE_O,
TbAppCollectTableInfo.SRC_SCHEMA_O,
TbAppCollectTableInfo.SRC_TABLE_O,
TbAppCollectTableInfo.SRC_PULSAR_ADDR_O,
TbAppCollectTableInfo.SRC_TOPIC_O,
TbAppCollectTableInfo.TGT_DB_O,
TbAppCollectTableInfo.TGT_TABLE_O,
TbAppCollectTableInfo.TGT_HDFS_PATH_O,
TbAppCollectTableInfo.TGT_TABLE_TYPE_O,
TbAppCollectTableInfo.STATUS_O,
TbAppCollectTableInfo.FILTER_FIELD_O,
TbAppCollectTableInfo.FILTER_VALUES_O,
TbAppCollectTableInfo.FILTER_TYPE_O,
TbAppCollectTableInfo.BUCKET_NUMBER_O,
TbAppCollectTableInfo.PARTITION_FIELD_O
)
.values()
.addValue(Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q, Q)
.precompileSql(),
addon.getId(),
addon.getAlias(),
addon.getFlinkJobId(),
addon.getHudiJobId(),
addon.getSyncYarnJobId(),
addon.getCompactionYarnJobId(),
1L,
addon.getSourceDatabase(),
addon.getSourceDatabaseType(),
addon.getSourceSchema(),
addon.getSourceTable(),
addon.getSourcePulsarAddress(),
addon.getSourceTopic(),
addon.getTargetSchema(),
addon.getTargetTable(),
addon.getTargetHdfs(),
"MERGE_ON_READ",
STATUS_Y,
addon.getFilterField(),
addon.getFilterValues(),
addon.getFilterType(),
addon.getBucketNumber(),
"CITY_ID"
);
}
}