refactor(executor-task): 优化pulsar扫描配置

This commit is contained in:
2024-01-19 17:37:50 +08:00
parent 9140a39bf1
commit 99e636d55d
6 changed files with 40 additions and 27 deletions

View File

@@ -171,6 +171,6 @@ public class DataScanner {
.sinkTo(FlinkHelper.createFileSink(taskContext)) .sinkTo(FlinkHelper.createFileSink(taskContext))
.setParallelism(10) .setParallelism(10)
.name("Output results"); .name("Output results");
environment.execute(); environment.execute(StrUtil.format("Search {}", key));
} }
} }

View File

@@ -82,6 +82,6 @@ public class RecordView implements Serializable, Comparable<RecordView> {
} }
public enum Operation { public enum Operation {
DELETE, UPSERT, ROLLBACK, RESULT, SOURCE DELETE, UPSERT, ROLLBACK, RESULT, SOURCE, QUEUE, TARGET
} }
} }

View File

@@ -9,13 +9,15 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.*; import org.apache.flink.api.connector.source.*;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -25,7 +27,7 @@ import org.slf4j.LoggerFactory;
*/ */
public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Collection<ReadPulsarSplit>>, ResultTypeQueryable<RecordView>, Serializable { public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Collection<ReadPulsarSplit>>, ResultTypeQueryable<RecordView>, Serializable {
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class); private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class);
private static final Long TASK_GAP = 6 * 60 * 60 * 1000L; private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(30);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withLocale(Locale.CHINA) .withLocale(Locale.CHINA)
.withZone(ZoneId.systemDefault()); .withZone(ZoneId.systemDefault());
@@ -47,9 +49,7 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
Message<byte[]> message = consumer.receive(); Message<byte[]> message = consumer.receive();
long startTimestamp = message.getPublishTime(); long startTimestamp = message.getPublishTime();
long endTimestamp = Instant.now().toEpochMilli(); long endTimestamp = Instant.now().toEpochMilli();
long gap = Math.max((endTimestamp - startTimestamp) / (parallelism - 1), 1000 * 60 * 60); MutableList<ReadPulsarSplit> tasks = Lists.mutable.empty();
logger.info("Gap: {}, Parallelism: {}", gap, parallelism);
List<ReadPulsarSplit> tasks = new ArrayList<>();
while (startTimestamp < endTimestamp) { while (startTimestamp < endTimestamp) {
tasks.add(new ReadPulsarSplit( tasks.add(new ReadPulsarSplit(
taskContext.getTaskId(), taskContext.getTaskId(),
@@ -57,14 +57,15 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
pulsarTopic, pulsarTopic,
latestMessageId.toString(), latestMessageId.toString(),
startTimestamp, startTimestamp,
startTimestamp + gap Math.min(endTimestamp, startTimestamp + TASK_GAP)
)); ));
startTimestamp += gap; startTimestamp += TASK_GAP;
} }
splits = tasks; logger.info("Gap: {}, Parallelism: {}, Splits: {}", TASK_GAP, parallelism, tasks.size());
for (ReadPulsarSplit split : splits) { for (ReadPulsarSplit split : tasks) {
logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp())); logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
} }
splits = new ArrayList<>(tasks.shuffleThis());
} }
} }
} }

View File

@@ -35,10 +35,10 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSpl
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
final ReadPulsarSplit split = readQueue.poll(); final ReadPulsarSplit split = readQueue.poll();
if (ObjectUtil.isNotNull(split)) { if (ObjectUtil.isNotNull(split)) {
logger.info("Assign split for {}, split: {}", subtaskId, split); logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size());
context.assignSplit(split, subtaskId); context.assignSplit(split, subtaskId);
} else { } else {
logger.info("No more split for {}", subtaskId); logger.info("t{} No more split for {}", subtaskId, subtaskId);
context.signalNoMoreSplits(subtaskId); context.signalNoMoreSplits(subtaskId);
} }
} }

View File

@@ -14,6 +14,7 @@ import java.util.Locale;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -56,7 +57,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
private RecordView parsePulsarMessage(Message<byte[]> message) { private RecordView parsePulsarMessage(Message<byte[]> message) {
return new RecordView( return new RecordView(
RecordView.Operation.SOURCE, RecordView.Operation.QUEUE,
new String(message.getValue()), new String(message.getValue()),
FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())), FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())),
message.getMessageId().toString() message.getMessageId().toString()
@@ -66,7 +67,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override @Override
public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception { public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception {
if (ObjectUtil.isNotNull(currentSplit)) { if (ObjectUtil.isNotNull(currentSplit)) {
logger.info("Read split: {}", currentSplit); logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
try (PulsarClient client = PulsarClient.builder() try (PulsarClient client = PulsarClient.builder()
.serviceUrl(currentSplit.getPulsarUrl()) .serviceUrl(currentSplit.getPulsarUrl())
.build()) { .build()) {
@@ -83,7 +84,13 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionMode(SubscriptionMode.NonDurable) .subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionType(SubscriptionType.Exclusive) .subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(StrUtil.format("Task_Reader_{}_{}", currentSplit.getTaskId(), readerContext.getIndexOfSubtask())) .subscriptionName(StrUtil.format(
"Task_Reader_{}_{}_{}_{}",
currentSplit.getTaskId(),
readerContext.getIndexOfSubtask(),
currentSplit.getStartTimestamp(),
currentSplit.getEndTimestamp()
))
.startMessageIdInclusive() .startMessageIdInclusive()
.subscribe()) { .subscribe()) {
consumer.seek(currentSplit.getStartTimestamp()); consumer.seek(currentSplit.getStartTimestamp());
@@ -92,24 +99,29 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
long currentTimestamp = 0; long currentTimestamp = 0;
for (Message<byte[]> message : messages) { for (Message<byte[]> message : messages) {
currentTimestamp = message.getPublishTime(); currentTimestamp = message.getPublishTime();
}
if (currentTimestamp > currentSplit.getEndTimestamp()) {
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), currentTimestamp, currentSplit.getEndTimestamp(), readQueue.size());
break;
}
for (Message<byte[]> message : messages) {
output.collect(parsePulsarMessage(message)); output.collect(parsePulsarMessage(message));
} }
consumer.acknowledge(messages); consumer.acknowledge(messages);
if (currentTimestamp > currentSplit.getEndTimestamp()) {
logger.info("Break for {} -> {}", currentTimestamp, currentSplit.getEndTimestamp());
break;
}
messages = consumer.batchReceive(); messages = consumer.batchReceive();
} }
} }
} }
} }
return tryMoveToNextSplit();
}
private InputStatus tryMoveToNextSplit() { currentSplit = null;
if (ObjectUtil.isEmpty(readQueue) && !noMoreSplits) {
readerContext.sendSplitRequest();
logger.info("t{} Request new split", readerContext.getIndexOfSubtask());
}
currentSplit = readQueue.poll(); currentSplit = readQueue.poll();
logger.info("Current split: {}", currentSplit); logger.info("t{} Queue rest: {}, Current split: {}", readerContext.getIndexOfSubtask(), readQueue.size(), currentSplit);
if (ObjectUtil.isNotNull(currentSplit)) { if (ObjectUtil.isNotNull(currentSplit)) {
return InputStatus.MORE_AVAILABLE; return InputStatus.MORE_AVAILABLE;
} else if (noMoreSplits) { } else if (noMoreSplits) {
@@ -134,14 +146,14 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override @Override
public void addSplits(List<ReadPulsarSplit> splits) { public void addSplits(List<ReadPulsarSplit> splits) {
logger.info("Add splits: {}", splits); logger.info("t{} Add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getStartTimestamp).collect(Collectors.toList()));
readQueue.addAll(splits); readQueue.addAll(splits);
availability.complete(null); availability.complete(null);
} }
@Override @Override
public void notifyNoMoreSplits() { public void notifyNoMoreSplits() {
logger.info("No more splits for {}", readerContext.getIndexOfSubtask()); logger.info("t{} No more splits for {}", readerContext.getIndexOfSubtask(), readerContext.getIndexOfSubtask());
noMoreSplits = true; noMoreSplits = true;
availability.complete(null); availability.complete(null);
} }

View File

@@ -3,7 +3,7 @@
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level> <level>INFO</level>
</filter> </filter>
<url>http://132.126.207.125:33100/loki/api/v1/push</url> <url>http://132.126.207.126:33100/loki/api/v1/push</url>
<encoder> <encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [${HOSTNAME}] %-5level ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n</pattern> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [${HOSTNAME}] %-5level ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n</pattern>
</encoder> </encoder>