refactor(executor-task): 优化pulsar扫描任务

调整pulsar source并行度设置,优化pulsar时间分段长度
This commit is contained in:
2024-01-22 09:43:57 +08:00
parent 99e636d55d
commit ff72583d5d
8 changed files with 155 additions and 60 deletions

View File

@@ -82,7 +82,7 @@ public class DataScanner {
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
DataStream<RecordView> source = null;
int totalParallelism = 20;
int totalParallelism = 30;
if (scanQueue) {
ArgumentsHelper.checkMetadata(taskContext, "pulsar");
String pulsarUrl = (String) metadata.get("pulsar");
@@ -90,8 +90,8 @@ public class DataScanner {
String pulsarTopic = (String) metadata.get("pulsar_topic");
logger.info("Scan queue topic: {} url: {}", pulsarTopic, pulsarUrl);
DataStream<RecordView> stream = environment
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic, 50), WatermarkStrategy.noWatermarks(), "Read pulsar")
.setParallelism(50)
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
.setParallelism(totalParallelism)
.disableChaining();
if (ObjectUtil.isNull(source)) {
source = stream;

View File

@@ -27,13 +27,13 @@ import org.slf4j.LoggerFactory;
*/
public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Collection<ReadPulsarSplit>>, ResultTypeQueryable<RecordView>, Serializable {
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class);
private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(30);
private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(60);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withLocale(Locale.CHINA)
.withZone(ZoneId.systemDefault());
private final Collection<ReadPulsarSplit> splits;
public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic, Integer parallelism) throws PulsarClientException {
public ReadPulsarSource(TaskContext taskContext, String pulsarUrl, String pulsarTopic) throws PulsarClientException {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.build()) {
@@ -61,7 +61,7 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
));
startTimestamp += TASK_GAP;
}
logger.info("Gap: {}, Parallelism: {}, Splits: {}", TASK_GAP, parallelism, tasks.size());
logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size());
for (ReadPulsarSplit split : tasks) {
logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
}

View File

@@ -20,6 +20,7 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,10 +56,10 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
}
}
private RecordView parsePulsarMessage(Message<byte[]> message) {
private RecordView parsePulsarMessage(Message<String> message) {
return new RecordView(
RecordView.Operation.QUEUE,
new String(message.getValue()),
message.getValue(),
FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())),
message.getMessageId().toString()
);
@@ -66,24 +67,15 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override
public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception {
logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
if (ObjectUtil.isNotNull(currentSplit)) {
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(currentSplit.getPulsarUrl())
.build()) {
try (Consumer<byte[]> consumer = client.newConsumer()
try (Reader<String> reader = client.newReader(new StringSchema())
.topic(currentSplit.getPulsarTopic())
.batchReceivePolicy(
BatchReceivePolicy.builder()
.timeout(1, TimeUnit.SECONDS)
.maxNumMessages(0)
.maxNumBytes(0)
.build()
)
.receiverQueueSize(50000)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(StrUtil.format(
"Task_Reader_{}_{}_{}_{}",
currentSplit.getTaskId(),
@@ -92,23 +84,17 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
currentSplit.getEndTimestamp()
))
.startMessageIdInclusive()
.subscribe()) {
consumer.seek(currentSplit.getStartTimestamp());
Messages<byte[]> messages = consumer.batchReceive();
while (ObjectUtil.isNotNull(messages)) {
long currentTimestamp = 0;
for (Message<byte[]> message : messages) {
currentTimestamp = message.getPublishTime();
}
if (currentTimestamp > currentSplit.getEndTimestamp()) {
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), currentTimestamp, currentSplit.getEndTimestamp(), readQueue.size());
.startMessageId(MessageId.earliest)
.create()) {
reader.seek(currentSplit.getStartTimestamp());
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
while (ObjectUtil.isNotNull(message)) {
if (message.getPublishTime() > currentSplit.getEndTimestamp()) {
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size());
break;
}
for (Message<byte[]> message : messages) {
output.collect(parsePulsarMessage(message));
}
consumer.acknowledge(messages);
messages = consumer.batchReceive();
output.collect(parsePulsarMessage(message));
message = reader.readNext(10, TimeUnit.SECONDS);
}
}
}