feat(executor-task): 尝试优化pulsar的读取
This commit is contained in:
@@ -11,13 +11,14 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper;
|
||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
import org.apache.flink.api.connector.source.*;
|
import org.apache.flink.api.connector.source.*;
|
||||||
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
|
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
|
||||||
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
||||||
import org.apache.pulsar.client.api.*;
|
import org.apache.pulsar.client.api.*;
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
import org.eclipse.collections.api.list.MutableList;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -49,23 +50,20 @@ public class ReadPulsarSource implements Source<RecordView, ReadPulsarSplit, Col
|
|||||||
Message<byte[]> message = consumer.receive();
|
Message<byte[]> message = consumer.receive();
|
||||||
long startTimestamp = message.getPublishTime();
|
long startTimestamp = message.getPublishTime();
|
||||||
long endTimestamp = Instant.now().toEpochMilli();
|
long endTimestamp = Instant.now().toEpochMilli();
|
||||||
MutableList<ReadPulsarSplit> tasks = Lists.mutable.empty();
|
ImmutableList<ReadPulsarSplit> tasks = TimeRangeHelper.range(startTimestamp, endTimestamp, TASK_GAP)
|
||||||
while (startTimestamp < endTimestamp) {
|
.collect(range -> new ReadPulsarSplit(
|
||||||
tasks.add(new ReadPulsarSplit(
|
|
||||||
taskContext.getTaskId(),
|
taskContext.getTaskId(),
|
||||||
pulsarUrl,
|
pulsarUrl,
|
||||||
pulsarTopic,
|
pulsarTopic,
|
||||||
latestMessageId.toString(),
|
latestMessageId.toString(),
|
||||||
startTimestamp,
|
range.getStart(),
|
||||||
Math.min(endTimestamp, startTimestamp + TASK_GAP)
|
range.getEnd()
|
||||||
));
|
));
|
||||||
startTimestamp += TASK_GAP;
|
|
||||||
}
|
|
||||||
logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size());
|
logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size());
|
||||||
for (ReadPulsarSplit split : tasks) {
|
for (ReadPulsarSplit split : tasks) {
|
||||||
logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
|
logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp()));
|
||||||
}
|
}
|
||||||
splits = new ArrayList<>(tasks.shuffleThis());
|
splits = new ArrayList<>(tasks.toList().shuffleThis());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,11 @@ import java.util.ArrayDeque;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent;
|
||||||
|
import org.apache.flink.api.connector.source.SourceEvent;
|
||||||
import org.apache.flink.api.connector.source.SplitEnumerator;
|
import org.apache.flink.api.connector.source.SplitEnumerator;
|
||||||
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
|
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -21,21 +25,30 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator<ReadPulsarSpl
|
|||||||
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class);
|
private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSourceEnumerator.class);
|
||||||
private final SplitEnumeratorContext<ReadPulsarSplit> context;
|
private final SplitEnumeratorContext<ReadPulsarSplit> context;
|
||||||
private final Queue<ReadPulsarSplit> readQueue;
|
private final Queue<ReadPulsarSplit> readQueue;
|
||||||
|
private final AtomicInteger success = new AtomicInteger(0);
|
||||||
|
|
||||||
public ReadPulsarSourceEnumerator(SplitEnumeratorContext<ReadPulsarSplit> context, Collection<ReadPulsarSplit> splits) {
|
public ReadPulsarSourceEnumerator(SplitEnumeratorContext<ReadPulsarSplit> context, Collection<ReadPulsarSplit> splits) {
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.readQueue = new ArrayDeque<>(splits);
|
this.readQueue = new ArrayDeque<>(splits);
|
||||||
|
this.success.set(splits.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
|
||||||
|
if (sourceEvent instanceof FinishSplitEvent) {
|
||||||
|
logger.info("{}", success.decrementAndGet());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
|
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
|
||||||
final ReadPulsarSplit split = readQueue.poll();
|
final ReadPulsarSplit split = readQueue.poll();
|
||||||
if (ObjectUtil.isNotNull(split)) {
|
if (ObjectUtil.isNotNull(split)) {
|
||||||
logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size());
|
logger.info("t{} Assign split for {}, Queue rest: {}, Success: {}", subtaskId, subtaskId, readQueue.size(), success.get());
|
||||||
context.assignSplit(split, subtaskId);
|
context.assignSplit(split, subtaskId);
|
||||||
} else {
|
} else {
|
||||||
logger.info("t{} No more split for {}", subtaskId, subtaskId);
|
logger.info("t{} No more split for {}", subtaskId, subtaskId);
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ import java.util.Queue;
|
|||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent;
|
||||||
import org.apache.flink.api.connector.source.ReaderOutput;
|
import org.apache.flink.api.connector.source.ReaderOutput;
|
||||||
import org.apache.flink.api.connector.source.SourceReader;
|
import org.apache.flink.api.connector.source.SourceReader;
|
||||||
import org.apache.flink.api.connector.source.SourceReaderContext;
|
import org.apache.flink.api.connector.source.SourceReaderContext;
|
||||||
@@ -70,6 +72,8 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
|
|||||||
logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
|
logger.info("t{} Poll Next", readerContext.getIndexOfSubtask());
|
||||||
if (ObjectUtil.isNotNull(currentSplit)) {
|
if (ObjectUtil.isNotNull(currentSplit)) {
|
||||||
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
|
logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp());
|
||||||
|
long startTimestamp = currentSplit.getStartTimestamp();
|
||||||
|
long endTimestamp = currentSplit.getEndTimestamp();
|
||||||
try (PulsarClient client = PulsarClient.builder()
|
try (PulsarClient client = PulsarClient.builder()
|
||||||
.serviceUrl(currentSplit.getPulsarUrl())
|
.serviceUrl(currentSplit.getPulsarUrl())
|
||||||
.build()) {
|
.build()) {
|
||||||
@@ -80,16 +84,17 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
|
|||||||
"Task_Reader_{}_{}_{}_{}",
|
"Task_Reader_{}_{}_{}_{}",
|
||||||
currentSplit.getTaskId(),
|
currentSplit.getTaskId(),
|
||||||
readerContext.getIndexOfSubtask(),
|
readerContext.getIndexOfSubtask(),
|
||||||
currentSplit.getStartTimestamp(),
|
startTimestamp,
|
||||||
currentSplit.getEndTimestamp()
|
endTimestamp
|
||||||
))
|
))
|
||||||
.startMessageIdInclusive()
|
.startMessageIdInclusive()
|
||||||
.startMessageId(MessageId.earliest)
|
.startMessageId(MessageId.earliest)
|
||||||
.create()) {
|
.create()) {
|
||||||
reader.seek(currentSplit.getStartTimestamp());
|
reader.seek(startTimestamp);
|
||||||
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
|
Message<String> message = reader.readNext(10, TimeUnit.SECONDS);
|
||||||
while (ObjectUtil.isNotNull(message)) {
|
while (ObjectUtil.isNotNull(message)) {
|
||||||
if (message.getPublishTime() > currentSplit.getEndTimestamp()) {
|
long publishTime = message.getPublishTime();
|
||||||
|
if (publishTime > endTimestamp) {
|
||||||
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size());
|
logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -101,6 +106,7 @@ public class ReadPulsarSourceReader implements SourceReader<RecordView, ReadPuls
|
|||||||
}
|
}
|
||||||
|
|
||||||
currentSplit = null;
|
currentSplit = null;
|
||||||
|
readerContext.sendSourceEventToCoordinator(new FinishSplitEvent());
|
||||||
if (ObjectUtil.isEmpty(readQueue) && !noMoreSplits) {
|
if (ObjectUtil.isEmpty(readQueue) && !noMoreSplits) {
|
||||||
readerContext.sendSplitRequest();
|
readerContext.sendSplitRequest();
|
||||||
logger.info("t{} Request new split", readerContext.getIndexOfSubtask());
|
logger.info("t{} Request new split", readerContext.getIndexOfSubtask());
|
||||||
|
|||||||
@@ -0,0 +1,27 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit;
|
||||||
|
import org.apache.flink.api.connector.source.SourceEvent;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
*/
|
||||||
|
public class AddSplitEvent implements SourceEvent {
|
||||||
|
private final ImmutableList<ReadPulsarSplit> splits;
|
||||||
|
|
||||||
|
public AddSplitEvent(ImmutableList<ReadPulsarSplit> splits) {
|
||||||
|
this.splits = splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<ReadPulsarSplit> getSplits() {
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "AddSplitEvent{" +
|
||||||
|
"splits=" + splits +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event;
|
||||||
|
|
||||||
|
import org.apache.flink.api.connector.source.SourceEvent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
*/
|
||||||
|
public class FinishSplitEvent implements SourceEvent {
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.executor.task.helper;
|
||||||
|
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.eclipse.collections.api.list.MutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
*/
|
||||||
|
public class TimeRangeHelper {
|
||||||
|
public static final class TimeRange {
|
||||||
|
private final long start;
|
||||||
|
private final long end;
|
||||||
|
|
||||||
|
public TimeRange(long start, long end) {
|
||||||
|
this.start = start;
|
||||||
|
this.end = end;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStart() {
|
||||||
|
return start;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnd() {
|
||||||
|
return end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ImmutableList<TimeRange> range(long start, long end, long gap) {
|
||||||
|
MutableList<TimeRange> ranges = Lists.mutable.empty();
|
||||||
|
while (start <= end) {
|
||||||
|
ranges.add(new TimeRange(start, Math.min(end, start + gap)));
|
||||||
|
start += gap;
|
||||||
|
}
|
||||||
|
return ranges.toImmutable();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user