feat(executor-manager): 增加表数据查询10条采样任务

This commit is contained in:
v-zhangjc9
2024-06-03 16:17:41 +08:00
parent 8aba2475be
commit 6f2fce4359
6 changed files with 121 additions and 4 deletions

View File

@@ -53,13 +53,21 @@ public class ExecutorTaskController {
}
@GetMapping("table_summary")
public String latestOpTs(@RequestParam("hdfs") String hdfs) throws Exception {
public String tableSummary(@RequestParam("hdfs") String hdfs) throws Exception {
if (StrUtil.isBlank(hdfs)) {
throw new RuntimeException("Hdfs path cannot be empty");
}
return executorTaskService.tableSummary(hdfs);
}
@GetMapping("table_sampling")
public String tableSampling(@RequestParam("hdfs") String hdfs) throws Exception {
if (StrUtil.isBlank(hdfs)) {
throw new RuntimeException("Hdfs path cannot be empty");
}
return executorTaskService.tableSampling(hdfs);
}
@GetMapping("law_enforcement")
public String lawEnforcement(
@RequestParam("pulsar_url") String pulsarUrl,

View File

@@ -236,6 +236,31 @@ public class ExecutorTaskService {
return applicationId.toString();
}
public String tableSampling(String hdfs) throws Exception {
String taskId = taskId();
Configuration configuration = generateConfiguration(taskId, StrUtil.format("sampling {}", hdfs));
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m"));
MapBuilder<String, Object> builder = MapUtil.builder();
builder.put("hdfs", hdfs);
ApplicationId applicationId = Runner.run(
configuration,
"com.lanyuanxiaoyao.service.executor.task.TableSampling",
new String[]{
TaskConstants.TASK_CONTEXT_OPTION,
mapper.writeValueAsString(
new TaskContext(
taskId,
executorConfiguration.getTaskResultPath(),
Maps.mutable.ofMap(builder.build())
)
)
}
);
return applicationId.toString();
}
public String lawEnforcement(
String pulsarUrl,
String pulsarTopic,

View File

@@ -0,0 +1,57 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.org.apache.avro.Schema;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.executor.task.SQLExecutor.executeBatch;
import static com.lanyuanxiaoyao.service.executor.task.SQLExecutor.generateResult;
/**
* 表采样
*
* @author lanyuanxiaoyao
*/
public class TableSampling {
private static final Logger logger = LoggerFactory.getLogger(TableSampling.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) taskContext.getMetadata().get("hdfs");
Configuration configuration = new Configuration();
configuration.setStrings(FlinkOptions.PATH.key(), hdfs);
configuration.setInt(FlinkOptions.READ_TASKS.key(), 50);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(configuration)
.setBasePath(hdfs)
.build();
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Schema schema = tableConfig.getTableCreateSchema().orElseThrow(() -> new Exception("Cannot parse schema from " + hdfs));
ImmutableList<String> fields = Lists.immutable.ofAll(schema.getFields()).collect(Schema.Field::name);
try (CloseableIterator<Row> iterator = executeBatch(
metaClient,
tableName -> StrUtil.format("select {} from `{}` order by {} desc limit 10", fields.makeString(", "), tableName, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)
)) {
HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, fields));
}
}
}

View File

@@ -28,6 +28,9 @@ public interface TaskService {
@Get(value = "/task/table_summary", readTimeout = 2 * 60 * 1000)
String tableSummary(@Query("hdfs") String hdfs);
@Get(value = "/task/table_sampling", readTimeout = 2 * 60 * 1000)
String tableSampling(@Query("hdfs") String hdfs);
@Get("/task/results")
ImmutableList<String> results(@Query("task_id") String taskId);

View File

@@ -79,6 +79,18 @@ public class TaskController {
return AmisResponse.responseSuccess();
}
@GetMapping("table_sampling")
public AmisResponse<Object> tableSampling(@RequestParam("hdfs") String hdfs) {
if (StrUtil.isBlank(hdfs)) {
throw new RuntimeException("Hdfs cannot be blank");
}
ExecutorProvider.EXECUTORS.submit(() -> {
String applicationId = taskService.tableSampling(hdfs);
logger.info("Task: {}", applicationId);
});
return AmisResponse.responseSuccess();
}
@GetMapping("results")
public AmisMapResponse results(@RequestParam("task_id") String taskId) {
return AmisResponse.responseMapData()

View File

@@ -95,11 +95,11 @@ function taskTab() {
},
{
type: 'form',
title: '综合查询 (总数、最后操作时间)',
title: '综合查询',
actions: [
{
type: 'submit',
label: '提交任务',
type: 'action',
label: '总数&最后操作时间',
actionType: 'ajax',
api: {
method: 'get',
@@ -109,6 +109,18 @@ function taskTab() {
}
}
},
{
type: 'action',
label: '最后10条记录',
actionType: 'ajax',
api: {
method: 'get',
url: '${base}/task/table_sampling',
data: {
hdfs: '${hdfs|default:undefined}',
}
}
},
],
body: [
{