diff --git a/pom.xml b/pom.xml
index 07e11e8..cfe0cab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,6 +177,11 @@
progressbar
0.9.3
+
+ de.vandermeer
+ asciitable
+ 0.3.2
+
diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml
index 5e0b6c6..aa49dc1 100644
--- a/service-executor/service-executor-manager/pom.xml
+++ b/service-executor/service-executor-manager/pom.xml
@@ -38,6 +38,10 @@
+
+ de.vandermeer
+ asciitable
+
com.sun.jersey
jersey-client
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java
index ce9165b..d1d0031 100644
--- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java
@@ -52,12 +52,12 @@ public class ExecutorTaskController {
return executorTaskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget, filterFields);
}
- @GetMapping("latest_op_ts")
+ @GetMapping("table_summary")
public String latestOpTs(@RequestParam("hdfs") String hdfs) throws Exception {
if (StrUtil.isBlank(hdfs)) {
throw new RuntimeException("Hdfs path cannot be empty");
}
- return executorTaskService.scanLatestOpTs(hdfs);
+ return executorTaskService.tableSummary(hdfs);
}
@GetMapping("results")
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java
index a0a6741..37f0066 100644
--- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java
@@ -209,9 +209,9 @@ public class ExecutorTaskService {
return applicationId.toString();
}
- public String scanLatestOpTs(String hdfs) throws Exception {
+ public String tableSummary(String hdfs) throws Exception {
String taskId = taskId();
- Configuration configuration = generateConfiguration(taskId, StrUtil.format("latest_op_ts {}", hdfs));
+ Configuration configuration = generateConfiguration(taskId, StrUtil.format("summary {}", hdfs));
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m"));
MapBuilder builder = MapUtil.builder();
@@ -219,7 +219,7 @@ public class ExecutorTaskService {
ApplicationId applicationId = Runner.run(
configuration,
- "com.lanyuanxiaoyao.service.executor.task.LatestOperationTimeScan",
+ "com.lanyuanxiaoyao.service.executor.task.TableSummary",
new String[]{
TaskConstants.TASK_CONTEXT_OPTION,
mapper.writeValueAsString(
diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml
index 8b12f51..da512f2 100644
--- a/service-executor/service-executor-task/pom.xml
+++ b/service-executor/service-executor-task/pom.xml
@@ -77,6 +77,11 @@
pulsar-client
provided
+
+ de.vandermeer
+ asciitable
+ provided
+
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java
deleted file mode 100644
index 76f4b4f..0000000
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.lanyuanxiaoyao.service.executor.task;
-
-import cn.hutool.core.util.RandomUtil;
-import cn.hutool.core.util.StrUtil;
-import com.lanyuanxiaoyao.service.common.Constants;
-import com.lanyuanxiaoyao.service.executor.core.TaskContext;
-import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
-import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
-import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
-import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.eclipse.collections.api.list.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 查找base文件最后的opts
- *
- * @author lanyuanxiaoyao
- * @date 2024-01-22
- */
-public class LatestOperationTimeScan {
- private static final Logger logger = LoggerFactory.getLogger(LatestOperationTimeScan.class);
-
- public static void main(String[] args) throws Exception {
- TaskContext taskContext = ArgumentsHelper.getContext(args);
- logger.info("Context: {}", taskContext);
-
- ArgumentsHelper.checkMetadata(taskContext, "hdfs");
- String hdfs = (String) taskContext.getMetadata().get("hdfs");
-
- FileSystem fileSystem = FileSystem.get(new Configuration());
- HdfsHelper.checkHdfsPath(fileSystem, hdfs);
- ImmutableList basePaths = HdfsHelper.latestBasePaths(HdfsHelper.hdfsPaths(fileSystem, hdfs));
-
- int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);
-
- StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
- environment.setParallelism(parallelism);
- String maxValue = "0";
- try (CloseableIterator iterator = environment
- .fromCollection(basePaths.toList())
- .name("Read base paths")
- .flatMap(new ReadHudiFile())
- .name("Read hudi file")
- .setParallelism(parallelism)
- .map(view -> (String) view.getAttributes().getOrDefault(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "0"))
- .map(time -> new Tuple2<>(RandomUtil.randomInt(parallelism), time), TypeInformation.of(new TypeHint>() {
- }))
- .keyBy(tuple -> tuple.f0)
- .reduce(new RichReduceFunction>() {
- @Override
- public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
- return value1.f1.compareTo(value2.f1) > 0 ? value1 : value2;
- }
- })
- .map(tuple -> tuple.f1)
- /*.sinkTo(FlinkHelper.createFileSink(taskContext))*/
- .executeAndCollect("Find latest opts")) {
- while (iterator.hasNext()) {
- String item = iterator.next();
- if (item.compareTo(maxValue) > 0) {
- maxValue = item;
- }
- }
- }
- HdfsHelper.createResult(fileSystem, taskContext, StrUtil.trim(maxValue));
- }
-}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java
new file mode 100644
index 0000000..554addf
--- /dev/null
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/SQLExecutor.java
@@ -0,0 +1,148 @@
+package com.lanyuanxiaoyao.service.executor.task;
+
+import cn.hutool.core.util.StrUtil;
+import de.vandermeer.asciitable.AsciiTable;
+import de.vandermeer.asciitable.CWC_LongestLine;
+import de.vandermeer.asciithemes.TA_GridThemes;
+import java.util.function.Function;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.org.apache.avro.Schema;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodiePipeline;
+import org.eclipse.collections.api.factory.Lists;
+import org.eclipse.collections.api.factory.Maps;
+import org.eclipse.collections.api.list.ImmutableList;
+import org.eclipse.collections.api.map.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 通过执行Flink SQL查数据
+ *
+ * @author lanyuanxiaoyao
+ * @date 2024-05-17
+ */
+public class SQLExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(SQLExecutor.class);
+
+ protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Function SQL) throws Exception {
+ return executeStream(metaClient, 10, SQL);
+ }
+
+ protected static CloseableIterator executeStream(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
+ HoodiePipeline.Builder builder = HoodiePipeline.builder(tableConfig.getTableName());
+ for (Schema.Field field : schema.getFields()) {
+ DataType type = AvroSchemaConverter.convertToDataType(field.schema());
+ builder.column(StrUtil.format("{} {}", field.name(), type.getLogicalType()));
+ }
+ tableConfig.getPartitionFields().ifPresent(builder::partition);
+ tableConfig.getRecordKeyFields().ifPresent(builder::pk);
+ builder.option("connector", "hudi");
+ builder.option(FlinkOptions.PATH, metaClient.getBasePathV2());
+ builder.option(FlinkOptions.READ_TASKS, parallelism);
+
+ Table table = tableEnvironment.fromDataStream(builder.source(executionEnvironment));
+ return SQL.apply(table).collect();
+ }
+
+ protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Function SQL) throws Exception {
+ return executeBatch(metaClient, 10, SQL);
+ }
+
+ protected static CloseableIterator executeBatch(HoodieTableMetaClient metaClient, Integer parallelism, Function SQL) throws Exception {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+
+ TableEnvironment tableEnvironment = TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .inBatchMode()
+ .useBlinkPlanner()
+ .build()
+ );
+ tableEnvironment.executeSql(createHoodieTableDDL(
+ tableConfig.getTableName(),
+ Lists.immutable.ofAll(schema.getFields())
+ .collect(field -> {
+ DataType type = AvroSchemaConverter.convertToDataType(field.schema());
+ return StrUtil.format("{} {}", field.name(), type.getLogicalType());
+ }),
+ tableConfig.getPartitionFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
+ tableConfig.getRecordKeyFields().map(Lists.immutable::of).orElse(Lists.immutable.empty()),
+ Maps.immutable.of(
+ FlinkOptions.PATH.key(), metaClient.getBasePathV2().toString(),
+ FlinkOptions.READ_TASKS.key(), parallelism.toString()
+ )
+ ));
+ return tableEnvironment.executeSql(SQL.apply(tableConfig.getTableName())).collect();
+ }
+
+ protected static String generateResult(CloseableIterator iterator, ImmutableList fields) {
+ AsciiTable table = new AsciiTable();
+ table.addRule();
+ table.addRow(fields.toArray());
+ table.addRule();
+ iterator.forEachRemaining(row -> table.addRow(fields.collect(row::getField).toArray()));
+ table.addRule();
+ table.getRenderer().setCWC(new CWC_LongestLine());
+ table.getContext().setGridTheme(TA_GridThemes.NONE);
+ table.setPaddingLeftRight(1);
+ return table.render();
+ }
+
+ private static String createHoodieTableDDL(
+ String tableName,
+ ImmutableList fields,
+ ImmutableList primaryKeys,
+ ImmutableList partitionKeys,
+ ImmutableMap options
+ ) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create table if not exists ")
+ .append(tableName)
+ .append("(\n");
+ for (String field : fields) {
+ builder.append(" ")
+ .append(field)
+ .append(",\n");
+ }
+ builder.append(" PRIMARY KEY(")
+ .append(primaryKeys.makeString(","))
+ .append(") NOT ENFORCED\n")
+ .append(")\n");
+ if (!partitionKeys.isEmpty()) {
+ String partitons = partitionKeys
+ .collect(key -> StrUtil.format("`{}`", key))
+ .makeString(",");
+ builder.append("PARTITIONED BY (")
+ .append(partitons)
+ .append(")\n");
+ }
+ builder.append("with ('connector' = 'hudi'");
+ options.forEachKeyValue((k, v) -> builder
+ .append(",\n")
+ .append(" '")
+ .append(k)
+ .append("' = '")
+ .append(v)
+ .append("'"));
+ builder.append("\n)");
+ return builder.toString();
+ }
+}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java
new file mode 100644
index 0000000..ac85cc9
--- /dev/null
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/TableSummary.java
@@ -0,0 +1,46 @@
+package com.lanyuanxiaoyao.service.executor.task;
+
+import cn.hutool.core.util.StrUtil;
+import com.lanyuanxiaoyao.service.common.Constants;
+import com.lanyuanxiaoyao.service.executor.core.TaskContext;
+import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
+import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.eclipse.collections.api.factory.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 统计表总数
+ *
+ * @author lanyuanxiaoyao
+ * @date 2024-05-17
+ */
+public class TableSummary extends SQLExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(TableSummary.class);
+
+ public static void main(String[] args) throws Exception {
+ TaskContext taskContext = ArgumentsHelper.getContext(args);
+ logger.info("Context: {}", taskContext);
+
+ ArgumentsHelper.checkMetadata(taskContext, "hdfs");
+ String hdfs = (String) taskContext.getMetadata().get("hdfs");
+
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(new Configuration())
+ .setBasePath(hdfs)
+ .build();
+ int parallelism = HdfsHelper.baseScanParallelismPredict(HdfsHelper.basePaths(HdfsHelper.hdfsPaths(metaClient.getRawFs(), hdfs)));
+ try (CloseableIterator iterator = executeBatch(
+ metaClient,
+ parallelism,
+ tableName -> StrUtil.format("select count({}) as `count`, max({}) as latest_op_ts from {}", Constants.UNION_KEY_NAME, Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, tableName)
+ )) {
+ HdfsHelper.createResult(FileSystem.get(metaClient.getHadoopConf()), taskContext, generateResult(iterator, Lists.immutable.of("count", "latest_op_ts")));
+ }
+ }
+}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java
index 0e88eed..8d982eb 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java
@@ -110,7 +110,7 @@ public class HdfsHelper {
public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException {
Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result");
try (FSDataOutputStream outputStream = fileSystem.create(resultPath)) {
- outputStream.writeUTF(result);
+ outputStream.writeBytes(result);
}
}
}
diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java
new file mode 100644
index 0000000..fe776db
--- /dev/null
+++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ConsoleTableTest.java
@@ -0,0 +1,26 @@
+package com.lanyuanxiaoyao.service.executor.task;
+
+import de.vandermeer.asciitable.AsciiTable;
+import de.vandermeer.asciitable.CWC_LongestLine;
+import de.vandermeer.asciithemes.TA_GridThemes;
+import de.vandermeer.asciithemes.a8.A8_Grids;
+
+/**
+ * @author
+ * @date 2024-05-17
+ */
+public class ConsoleTableTest {
+ public static void main(String[] args) {
+ AsciiTable table = new AsciiTable();
+ table.addRule();
+ table.addRow("Count", "Latest Opts");
+ table.addRule();
+ table.addRow("20657434", "2024-07-07 10:50:54");
+ table.addRow("20657434", "2024-07-07 10:50:54");
+ table.addRule();
+ table.getRenderer().setCWC(new CWC_LongestLine());
+ table.getContext().setGridTheme(TA_GridThemes.FULL);
+ table.setPaddingLeftRight(1);
+ System.out.println(table.render());
+ }
+}
diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java
index 50ad930..5146913 100644
--- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java
+++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java
@@ -25,8 +25,8 @@ public interface TaskService {
@Query("filter_fields") String filterFields
);
- @Get(value = "/task/latest_op_ts", readTimeout = 2 * 60 * 1000)
- String latestOpTs(@Query("hdfs") String hdfs);
+ @Get(value = "/task/table_summary", readTimeout = 2 * 60 * 1000)
+ String tableSummary(@Query("hdfs") String hdfs);
@Get("/task/results")
ImmutableList results(@Query("task_id") String taskId);
diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java
index 5309170..ee2aa0c 100644
--- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java
+++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TaskController.java
@@ -67,13 +67,13 @@ public class TaskController {
return AmisResponse.responseSuccess();
}
- @GetMapping("latest_op_ts")
- public AmisResponse