feat(executor-task): 完成动态调整的pulsar扫描

对于pulsar在时间段上资源分配不均,通过pulsar reader中重新分配split解决,保证整个读取的流程中都能尽可能利用多线程的优势
This commit is contained in:
v-zhangjc9
2024-06-05 17:37:04 +08:00
parent 4c528bca61
commit 0b4cec3955
10 changed files with 244 additions and 117 deletions

View File

@@ -48,12 +48,6 @@ public class HoodiePolice {
.filter(ObjectUtil::isNotNull)
.keyBy(Prisoner::getPartition)
.keyBy(Prisoner::getKey)
.reduce(new RichReduceFunction<Prisoner>() {
@Override
public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception {
return null;
}
})
.addSink(new PrisonerSink(taskContext));
environment.execute();
}

View File

@@ -60,7 +60,7 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
latestMessageId.toString(),
range.getStart(),
range.getEnd(),
TASK_GAP
range.getGap()
));
logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size());
for (ReadPulsarSplit split : tasks) {

View File

@@ -1,15 +1,22 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.AddEvent;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.EndEvent;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.StartEvent;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,6 +28,7 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSpl
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class);
private final SplitEnumeratorContext<ReadPulsarSplit> context;
private final Queue<ReadPulsarSplit> readQueue;
private final Set<String> runningTasks = Sets.mutable.<String>empty().asSynchronized();
public ReadPulsarSourceEnumerator(SplitEnumeratorContext<ReadPulsarSplit> context, Collection<ReadPulsarSplit> splits) {
this.context = context;
@@ -31,15 +39,40 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSpl
public void start() {
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof AddEvent) {
AddEvent event = (AddEvent) sourceEvent;
logger.info("t{} Add: {}", subtaskId, event.getSplits());
readQueue.addAll(event.getSplits());
logger.info("t{} ReSend task, running tasks: {}", subtaskId, runningTasks);
context.registeredReaders()
.values()
.forEach(info -> handleSourceEvent(info.getSubtaskId(), null));
} else if (sourceEvent instanceof EndEvent) {
EndEvent event = (EndEvent) sourceEvent;
logger.info("t{} End: {}", subtaskId, event.getSplit());
if (ObjectUtil.isNotNull(event.getSplit())) {
runningTasks.remove(event.getSplit().getSplitId());
}
logger.info("t{} Queue: {} running tasks: {}", subtaskId, readQueue.size(), runningTasks);
if (ObjectUtil.isEmpty(readQueue) && ObjectUtil.isEmpty(runningTasks)) {
logger.info("t{} No more", subtaskId);
context.registeredReaders()
.values()
.forEach(info -> context.signalNoMoreSplits(info.getSubtaskId()));
}
}
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
final ReadPulsarSplit split = readQueue.poll();
if (ObjectUtil.isNotNull(split)) {
logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size());
context.assignSplit(split, subtaskId);
} else {
logger.info("t{} No more split for {}", subtaskId, subtaskId);
context.signalNoMoreSplits(subtaskId);
runningTasks.add(split.getSplitId());
}
}

View File

@@ -1,9 +1,14 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.AddEvent;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.EndEvent;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.StartEvent;
import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
@@ -19,14 +24,12 @@ import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +54,9 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override
public void start() {
if (readQueue.isEmpty()) {
readerContext.sendSplitRequest();
}
}
private RecordView parsePulsarMessage(Message<String> message) {
@@ -65,6 +71,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
@Override
public InputStatus pollNext(ReaderOutput<RecordView> output) throws Exception {
logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
readerContext.sendSourceEventToCoordinator(new StartEvent());
if (ObjectUtil.isNotNull(currentSplit)) {
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getSplitId());
long startTimestamp = currentSplit.getStartTimestamp();
@@ -86,6 +93,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
.startMessageId(MessageId.earliest)
.create()) {
reader.seek(startTimestamp);
long count = 0;
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
while (ObjectUtil.isNotNull(message)) {
long publishTime = message.getPublishTime();
@@ -93,11 +101,32 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size());
break;
}
if (++count > 500000) {
ImmutableList<TimeRangeHelper.TimeRange> range = TimeRangeHelper.range(publishTime, endTimestamp, currentSplit.getGap() / 5);
if (ObjectUtil.isNotEmpty(range)) {
readerContext.sendSourceEventToCoordinator(new AddEvent(
range.collect(r -> new ReadPulsarSplit(
currentSplit.getTaskId(),
IdUtil.nanoId(10),
currentSplit.getPulsarUrl(),
currentSplit.getPulsarTopic(),
currentSplit.getLatestMessageId(),
r.getStart(),
r.getEnd(),
r.getGap()
)).toList()
));
break;
} else {
count = 0;
}
}
output.collect(parsePulsarMessage(message));
message = reader.readNext(10, TimeUnit.SECONDS);
}
}
}
readerContext.sendSourceEventToCoordinator(new EndEvent(currentSplit));
}
currentSplit = null;

View File

@@ -0,0 +1,28 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit;
import java.util.List;
import org.apache.flink.api.connector.source.SourceEvent;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @author lanyuanxiaoyao
*/
public class AddEvent implements SourceEvent {
private final List<ReadPulsarSplit> splits;
public AddEvent(List<ReadPulsarSplit> splits) {
this.splits = splits;
}
public List<ReadPulsarSplit> getSplits() {
return splits;
}
@Override
public String toString() {
return "AddSplitEvent{" +
"splits=" + splits +
'}';
}
}

View File

@@ -0,0 +1,26 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit;
import org.apache.flink.api.connector.source.SourceEvent;
/**
* @author lanyuanxiaoyao
*/
public class EndEvent implements SourceEvent {
private final ReadPulsarSplit split;
public EndEvent(ReadPulsarSplit split) {
this.split = split;
}
public ReadPulsarSplit getSplit() {
return split;
}
@Override
public String toString() {
return "EndEvent{" +
"split=" + split +
'}';
}
}

View File

@@ -0,0 +1,9 @@
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
import org.apache.flink.api.connector.source.SourceEvent;
/**
* @author lanyuanxiaoyao
*/
public class StartEvent implements SourceEvent {
}

View File

@@ -12,10 +12,12 @@ public class TimeRangeHelper {
public static final class TimeRange {
private final long start;
private final long end;
private final long gap;
public TimeRange(long start, long end) {
public TimeRange(long start, long end, long gap) {
this.start = start;
this.end = end;
this.gap = gap;
}
public long getStart() {
@@ -25,13 +27,19 @@ public class TimeRangeHelper {
public long getEnd() {
return end;
}
public long getGap() {
return gap;
}
}
public static ImmutableList<TimeRange> range(long start, long end, long gap) {
gap = Math.max(TimeUnit.MINUTES.toMillis(1), gap);
if (gap < TimeUnit.MINUTES.toMillis(5)) {
return Lists.immutable.empty();
}
MutableList<TimeRange> ranges = Lists.mutable.empty();
while (start <= end) {
ranges.add(new TimeRange(start, Math.min(end, start + gap)));
ranges.add(new TimeRange(start, Math.min(end, start + gap), gap));
start += gap;
}
return ranges.toImmutable();