diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http
index 8840189..85479d6 100644
--- a/.idea/httpRequests/http-requests-log.http
+++ b/.idea/httpRequests/http-requests-log.http
@@ -1,3 +1,21 @@
+GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all
+Connection: Keep-Alive
+User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
+Cookie: JSESSIONID=048A49CB9FD03402D9AA27CD2726B892
+Accept-Encoding: br,deflate,gzip,x-gzip
+
+###
+
+GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_web/cloud/list
+Connection: Keep-Alive
+User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
+Cookie: JSESSIONID=360A7282DB9D80CB6448B7D777A775FB
+Accept-Encoding: br,deflate,gzip,x-gzip
+
+<> 2024-01-17T120430.200.json
+
+###
+
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/hudi_api/api/sync_checkpoint_state?flink_job_id=1542097996099055616&alias=acct_acct_item_zs&message_id=861976:46933:-1&publish_time=1705373846898
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
@@ -464,23 +482,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip
###
-GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.142:14041/queue/poll/compaction-queue
-Connection: Keep-Alive
-User-Agent: Apache-HttpClient/4.5.13 (Java/17.0.5)
-Cookie: JSESSIONID=98CC709B9ED7F9CC70C5138E6350AB73
-Accept-Encoding: br,deflate,gzip,x-gzip
-
-<> 2023-05-07T174749.200.json
-
-###
-
-GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.142:14041/queue/poll/compaction-queue
-Connection: Keep-Alive
-User-Agent: Apache-HttpClient/4.5.13 (Java/17.0.5)
-Cookie: JSESSIONID=98CC709B9ED7F9CC70C5138E6350AB73
-Accept-Encoding: br,deflate,gzip,x-gzip
-
-<> 2023-05-07T174739.200.json
-
-###
-
diff --git a/service-executor/service-executor-manager/pom.xml b/service-executor/service-executor-manager/pom.xml
index e41090d..187ee82 100644
--- a/service-executor/service-executor-manager/pom.xml
+++ b/service-executor/service-executor-manager/pom.xml
@@ -27,6 +27,11 @@
service-configuration
1.0.0-SNAPSHOT
+
+ com.lanyuanxiaoyao
+ service-forest
+ 1.0.0-SNAPSHOT
+
org.springframework.boot
spring-boot-starter-logging
diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java
index bca43d7..659793f 100644
--- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java
+++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java
@@ -15,6 +15,8 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -176,9 +178,17 @@ public class TaskService {
}
}
}
+ Pattern dateRegex = Pattern.compile("(\\w+) (\\d{17}) (.+)");
return results
.reject(StrUtil::isBlank)
.collect(StrUtil::trim)
+ .sortThisBy(line -> {
+ Matcher matcher = dateRegex.matcher(line);
+ if (matcher.matches()) {
+ return Long.valueOf(matcher.group(2));
+ }
+ return 0L;
+ })
.toImmutable();
}
}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
index d1a84b7..83dc3b1 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java
@@ -82,7 +82,7 @@ public class DataScanner {
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
DataStream source = null;
- int totalParallelism = 20;
+ int totalParallelism = 30;
if (scanQueue) {
ArgumentsHelper.checkMetadata(taskContext, "pulsar");
String pulsarUrl = (String) metadata.get("pulsar");
@@ -90,8 +90,8 @@ public class DataScanner {
String pulsarTopic = (String) metadata.get("pulsar_topic");
logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl);
DataStream stream = environment
- .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic, 50), WatermarkStrategy.noWatermarks(), "Read pulsar")
- .setParallelism(50)
+ .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
+ .setParallelism(totalParallelism)
.disableChaining();
if (ObjectUtil.isNull(source)) {
source = stream;
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java
index 394b0ac..ecdd662 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java
@@ -27,13 +27,13 @@ import org.slf4j.LoggerFactory;
*/
public class ReadPulsarSource implements Source>, ResultTypeQueryable, Serializable {
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class);
- private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(30);
+ private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(60);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withLocale(Locale.CHINA)
.withZone(ZoneId.systemDefault());
private final Collection splits;
- public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic, Integer parallelism) throws PulsarClientException {
+ public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic) throws PulsarClientException {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.build()) {
@@ -61,7 +61,7 @@ public class ReadPulsarSource implements Source {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
}
diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java
index 7511c10..577d688 100644
--- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java
+++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java
@@ -20,6 +20,7 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,10 +56,10 @@ public class ReadPulsarSourceReader implements SourceReader message) {
+ private RecordView parsePulsarMessage(Message message) {
return new RecordView(
RecordView.Operation.QUEUE,
- new String(message.getValue()),
+ message.getValue(),
FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())),
message.getMessageId().toString()
);
@@ -66,24 +67,15 @@ public class ReadPulsarSourceReader implements SourceReader output) throws Exception {
+ logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
if (ObjectUtil.isNotNull(currentSplit)) {
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(currentSplit.getPulsarUrl())
.build()) {
- try (Consumer consumer = client.newConsumer()
+ try (Reader reader = client.newReader(new StringSchema())
.topic(currentSplit.getPulsarTopic())
- .batchReceivePolicy(
- BatchReceivePolicy.builder()
- .timeout(1, TimeUnit.SECONDS)
- .maxNumMessages(0)
- .maxNumBytes(0)
- .build()
- )
.receiverQueueSize(50000)
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscriptionMode(SubscriptionMode.NonDurable)
- .subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(StrUtil.format(
"Task_Reader_{}_{}_{}_{}",
currentSplit.getTaskId(),
@@ -92,23 +84,17 @@ public class ReadPulsarSourceReader implements SourceReader messages = consumer.batchReceive();
- while (ObjectUtil.isNotNull(messages)) {
- long currentTimestamp = 0;
- for (Message message : messages) {
- currentTimestamp = message.getPublishTime();
- }
- if (currentTimestamp > currentSplit.getEndTimestamp()) {
- logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), currentTimestamp, currentSplit.getEndTimestamp(), readQueue.size());
+ .startMessageId(MessageId.earliest)
+ .create()) {
+ reader.seek(currentSplit.getStartTimestamp());
+ Message message = reader.readNext(10, TimeUnit.SECONDS);
+ while (ObjectUtil.isNotNull(message)) {
+ if (message.getPublishTime() > currentSplit.getEndTimestamp()) {
+ logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size());
break;
}
- for (Message message : messages) {
- output.collect(parsePulsarMessage(message));
- }
- consumer.acknowledge(messages);
- messages = consumer.batchReceive();
+ output.collect(parsePulsarMessage(message));
+ message = reader.readNext(10, TimeUnit.SECONDS);
}
}
}
diff --git a/service-info-query/src/test/java/com/test/SqlBuilderTests.java b/service-info-query/src/test/java/com/test/SqlBuilderTests.java
index 10b14b8..25f3626 100644
--- a/service-info-query/src/test/java/com/test/SqlBuilderTests.java
+++ b/service-info-query/src/test/java/com/test/SqlBuilderTests.java
@@ -1,8 +1,10 @@
package com.test;
import club.kingon.sql.builder.SqlBuilder;
+import club.kingon.sql.builder.entry.Alias;
import club.kingon.sql.builder.entry.Column;
import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.db.sql.SqlUtil;
import com.eshore.odcp.hudi.connector.SQLConstants;
@@ -21,11 +23,105 @@ public class SqlBuilderTests {
String STATUS_Y = "y";
String STATUS_N = "n";
System.out.println(SqlUtil.formatSql(
- SqlBuilder
- .select(TbAppHudiSyncState.MESSAGE_ID_A)
- .from(TbAppHudiSyncState._alias_)
- .whereEq(TbAppHudiSyncState.ID_A, null)
- .precompileSql()
+ SqlBuilder.select(
+ SQLConstants.IapDatahub.DataSource.DS_NAME_A,
+ SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A,
+ SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A,
+ SQLConstants.IapDatahub.DataSourceTable.TABLE_TYPE_A,
+ SQLConstants.IapDatahub.DataSourceTableField.FIELD_NAME_A,
+ SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A,
+ SQLConstants.IapDatahub.DataSourceTableField.FIELD_TYPE_A,
+ SQLConstants.IapDatahub.DataSourceTableField.PRIMARY_KEY_A,
+ SQLConstants.IapDatahub.DataSourceTableField.PARTITION_KEY_A,
+ SQLConstants.IapDatahub.DataSourceTableField.LENGTH_A,
+ TbAppCollectTableInfo.TGT_DB_A,
+ TbAppCollectTableInfo.TGT_TABLE_A,
+ TbAppCollectTableInfo.TGT_TABLE_TYPE_A,
+ TbAppCollectTableInfo.TGT_HDFS_PATH_A,
+ TbAppHudiJobConfig.WRITE_TASKS_A,
+ TbAppHudiJobConfig.WRITE_OPERATION_A,
+ TbAppHudiJobConfig.WRITE_TASK_MAX_MEMORY_A,
+ TbAppHudiJobConfig.WRITE_BATCH_SIZE_A,
+ TbAppHudiJobConfig.WRITE_RATE_LIMIT_A,
+ TbAppCollectTableInfo.BUCKET_NUMBER_A,
+ TbAppHudiJobConfig.COMPACTION_STRATEGY_A,
+ TbAppHudiJobConfig.COMPACTION_TASKS_A,
+ TbAppHudiJobConfig.COMPACTION_DELTA_COMMITS_A,
+ TbAppHudiJobConfig.COMPACTION_DELTA_SECONDS_A,
+ TbAppHudiJobConfig.COMPACTION_ASYNC_ENABLED_A,
+ TbAppHudiJobConfig.COMPACTION_MAX_MEMORY_A,
+ TbAppHudiJobConfig.CONFIGS_A,
+ TbAppCollectTableInfo.FILTER_FIELD_A,
+ TbAppCollectTableInfo.FILTER_VALUES_A,
+ TbAppCollectTableInfo.FILTER_TYPE_A,
+ TbAppCollectTableInfo.SRC_TOPIC_A,
+ TbAppCollectTableInfo.SRC_PULSAR_ADDR_A,
+ Alias.of("tayjc_sync.job_manager_memory", "sync_job_manager_memory"),
+ Alias.of("tayjc_sync.task_manager_memory", "sync_task_manager_memory"),
+ Alias.of("tayjc_compaction.job_manager_memory", "compaction_job_manager_memory"),
+ Alias.of("tayjc_compaction.task_manager_memory", "compaction_task_manger_momory"),
+ TbAppCollectTableInfo.PARTITION_FIELD_A,
+ TbAppHudiSyncState.MESSAGE_ID_A,
+ TbAppGlobalConfig.METRIC_PUBLISH_URL_A,
+ TbAppGlobalConfig.METRIC_PROMETHEUS_URL_A,
+ TbAppGlobalConfig.METRIC_API_URL_A,
+ TbAppGlobalConfig.METRIC_PUBLISH_DELAY_A,
+ TbAppGlobalConfig.METRIC_PUBLISH_PERIOD_A,
+ TbAppGlobalConfig.METRIC_PUBLISH_TIMEOUT_A,
+ TbAppGlobalConfig.METRIC_PUBLISH_BATCH_A,
+ Alias.of(TbAppFlinkJobConfig.ID_A, "job_id"),
+ Alias.of(TbAppFlinkJobConfig.NAME_A, "job_name"),
+ TbAppGlobalConfig.CHECKPOINT_ROOT_PATH_A,
+ TbAppHudiJobConfig.SOURCE_TASKS_A,
+ TbAppCollectTableInfo.ALIAS_A,
+ SQLConstants.IapDatahub.DataSource.CONNECTION_A,
+ TbAppCollectTableInfo.PRIORITY_A,
+ SQLConstants.IapDatahub.DataSource.DS_TYPE_A,
+ TbAppHudiJobConfig.KEEP_FILE_VERSION_A,
+ TbAppHudiJobConfig.KEEP_COMMIT_VERSION_A,
+ TbAppCollectTableInfo.TAGS_A,
+ TbAppGlobalConfig.ZK_URL_A,
+ TbAppCollectTableInfo.VERSION_A,
+ SQLConstants.IapDatahub.DataSourceTableField.SCALE_A
+ )
+ .from(
+ SQLConstants.IapDatahub.DataSource._alias_,
+ SQLConstants.IapDatahub.DataSourceTable._alias_,
+ SQLConstants.IapDatahub.DataSourceTableField._alias_,
+ TbAppFlinkJobConfig._alias_,
+ TbAppHudiJobConfig._alias_,
+ Alias.of(TbAppYarnJobConfig._origin_, "tayjc_sync"),
+ Alias.of(TbAppYarnJobConfig._origin_, "tayjc_compaction"),
+ TbAppGlobalConfig._alias_,
+ TbAppCollectTableInfo._alias_,
+ TbAppHudiSyncState._alias_
+ )
+ .whereEq(SQLConstants.IapDatahub.DataSource.DS_ROLE_A, "src")
+ .andEq(SQLConstants.IapDatahub.DataSource.DS_STATE_A, STATUS_Y)
+ .andEq(SQLConstants.IapDatahub.DataSource.RECORD_STATE_A, STATUS_Y)
+ .andEq(SQLConstants.IapDatahub.DataSourceTable.DS_ID_A, Column.as(SQLConstants.IapDatahub.DataSource.DS_ID_A))
+ .andEq(SQLConstants.IapDatahub.DataSourceTable.RECORD_STATE_A, STATUS_Y)
+ .andEq(SQLConstants.IapDatahub.DataSourceTableField.TABLE_ID_A, Column.as(SQLConstants.IapDatahub.DataSourceTable.TABLE_ID_A))
+ .andEq(SQLConstants.IapDatahub.DataSourceTableField.RECORD_STATE_A, STATUS_Y)
+ .andIn(SQLConstants.IapDatahub.DataSource.DS_TYPE_A, "udal", "telepg")
+ .andEq(SQLConstants.IapDatahub.DataSource.DS_NAME_A, Column.as(TbAppCollectTableInfo.SRC_DB_A))
+ .andEq(SQLConstants.IapDatahub.DataSource.SCHEMA_NAME_A, Column.as(TbAppCollectTableInfo.SRC_SCHEMA_A))
+ .andEq(SQLConstants.IapDatahub.DataSourceTable.TABLE_NAME_A, Column.as(TbAppCollectTableInfo.SRC_TABLE_A))
+ .andEq(TbAppCollectTableInfo.FLINK_JOB_ID_A, Column.as(TbAppFlinkJobConfig.ID_A))
+ .andEq(TbAppCollectTableInfo.HUDI_JOB_ID_A, Column.as(TbAppHudiJobConfig.ID_A))
+ .andEq(TbAppCollectTableInfo.SYNC_YARN_JOB_ID_A, Column.as("tayjc_sync.id"))
+ .andEq(TbAppCollectTableInfo.COMPACTION_YARN_JOB_ID_A, Column.as("tayjc_compaction.id"))
+ .andEq(TbAppCollectTableInfo.CONFIG_ID_A, Column.as(TbAppGlobalConfig.ID_A))
+ .andEq(TbAppHudiSyncState.ID_A, Column.as(StrUtil.format("concat({}, '-', {})", TbAppFlinkJobConfig.ID_A, TbAppCollectTableInfo.ALIAS_A)))
+ .andEq(ObjectUtil.isNotNull(flinkJobId), TbAppFlinkJobConfig.ID_A, flinkJobId)
+ .andEq(StrUtil.isNotBlank(alias), TbAppCollectTableInfo.ALIAS_A, alias)
+ .andEq(TbAppCollectTableInfo.STATUS_A, STATUS_Y)
+ .andEq(TbAppFlinkJobConfig.STATUS_A, STATUS_Y)
+ .andEq(TbAppHudiJobConfig.STATUS_A, STATUS_Y)
+ .andEq("tayjc_sync.status", STATUS_Y)
+ .andEq("tayjc_compaction.status", STATUS_Y)
+ .orderBy(SQLConstants.IapDatahub.DataSourceTableField.FIELD_SEQ_A)
+ .build()
));
}
}
diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js
index 7868e72..fd2fd2e 100644
--- a/service-web/src/main/resources/static/components/common.js
+++ b/service-web/src/main/resources/static/components/common.js
@@ -1315,7 +1315,7 @@ function tableMetaDialog() {
dialog: {
title: '队列详情',
actions: [],
- size: 'lg',
+ size: 'xl',
body: {
type: 'service',
api: {