diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java index 5a04146..8c3a8dc 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java @@ -11,13 +11,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Locale; import java.util.concurrent.TimeUnit; + +import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.*; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.pulsar.client.api.*; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,23 +50,20 @@ public class ReadPulsarSource implements Source message = consumer.receive(); long startTimestamp = message.getPublishTime(); long endTimestamp = Instant.now().toEpochMilli(); - MutableList tasks = Lists.mutable.empty(); - while (startTimestamp < endTimestamp) { - tasks.add(new ReadPulsarSplit( - taskContext.getTaskId(), - pulsarUrl, - pulsarTopic, - latestMessageId.toString(), - startTimestamp, - Math.min(endTimestamp, startTimestamp + TASK_GAP) - )); - startTimestamp += TASK_GAP; - } + ImmutableList tasks = TimeRangeHelper.range(startTimestamp, endTimestamp, TASK_GAP) + .collect(range -> new ReadPulsarSplit( + taskContext.getTaskId(), + pulsarUrl, + pulsarTopic, + latestMessageId.toString(), + range.getStart(), + range.getEnd() + )); logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size()); for (ReadPulsarSplit split : tasks) { logger.info("Read split: {} -> {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp())); } - splits = new ArrayList<>(tasks.shuffleThis()); + splits = new ArrayList<>(tasks.toList().shuffleThis()); } } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java index 5360ca4..ee68971 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java @@ -7,7 +7,11 @@ import java.util.ArrayDeque; import java.util.Collection; import java.util.List; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; + +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.slf4j.Logger; @@ -21,21 +25,30 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator context; private final Queue readQueue; + private final AtomicInteger success = new AtomicInteger(0); public ReadPulsarSourceEnumerator(SplitEnumeratorContext context, Collection splits) { this.context = context; this.readQueue = new ArrayDeque<>(splits); + this.success.set(splits.size()); } @Override public void start() { } + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof FinishSplitEvent) { + logger.info("{}", success.decrementAndGet()); + } + } + @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { final ReadPulsarSplit split = readQueue.poll(); if (ObjectUtil.isNotNull(split)) { - logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size()); + logger.info("t{} Assign split for {}, Queue rest: {}, Success: {}", subtaskId, subtaskId, readQueue.size(), success.get()); context.assignSplit(split, subtaskId); } else { logger.info("t{} No more split for {}", subtaskId, subtaskId); diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java index 577d688..6182fdf 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java @@ -15,6 +15,8 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -70,6 +72,8 @@ public class ReadPulsarSourceReader implements SourceReader message = reader.readNext(10, TimeUnit.SECONDS); while (ObjectUtil.isNotNull(message)) { - if (message.getPublishTime() > currentSplit.getEndTimestamp()) { + long publishTime = message.getPublishTime(); + if (publishTime > endTimestamp) { logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size()); break; } @@ -101,6 +106,7 @@ public class ReadPulsarSourceReader implements SourceReader splits; + + public AddSplitEvent(ImmutableList splits) { + this.splits = splits; + } + + public ImmutableList getSplits() { + return splits; + } + + @Override + public String toString() { + return "AddSplitEvent{" + + "splits=" + splits + + '}'; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java new file mode 100644 index 0000000..4351faf --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java @@ -0,0 +1,9 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * @author lanyuanxiaoyao + */ +public class FinishSplitEvent implements SourceEvent { +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java new file mode 100644 index 0000000..92261a2 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java @@ -0,0 +1,37 @@ +package com.lanyuanxiaoyao.service.executor.task.helper; + +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; + +/** + * @author lanyuanxiaoyao + */ +public class TimeRangeHelper { + public static final class TimeRange { + private final long start; + private final long end; + + public TimeRange(long start, long end) { + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + } + + public static ImmutableList range(long start, long end, long gap) { + MutableList ranges = Lists.mutable.empty(); + while (start <= end) { + ranges.add(new TimeRange(start, Math.min(end, start + gap))); + start += gap; + } + return ranges.toImmutable(); + } +}