diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 987abd3..d1a84b7 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -171,6 +171,6 @@ public class DataScanner { .sinkTo(FlinkHelper.createFileSink(taskContext)) .setParallelism(10) .name("Output results"); - environment.execute(); + environment.execute(StrUtil.format("Search {}", key)); } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java index d5553fd..82e3bad 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java @@ -82,6 +82,6 @@ public class RecordView implements Serializable, Comparable { } public enum Operation { - DELETE, UPSERT, ROLLBACK, RESULT, SOURCE + DELETE, UPSERT, ROLLBACK, RESULT, SOURCE, QUEUE, TARGET } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java index b228ccd..394b0ac 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java @@ -9,13 +9,15 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.*; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.pulsar.client.api.*; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +27,7 @@ import org.slf4j.LoggerFactory; */ public class ReadPulsarSource implements Source>, ResultTypeQueryable, Serializable { private static final Logger logger = LoggerFactory.getLogger(ReadPulsarSource.class); - private static final Long TASK_GAP = 6 * 60 * 60 * 1000L; + private static final Long TASK_GAP = TimeUnit.MINUTES.toMillis(30); private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") .withLocale(Locale.CHINA) .withZone(ZoneId.systemDefault()); @@ -47,9 +49,7 @@ public class ReadPulsarSource implements Source message = consumer.receive(); long startTimestamp = message.getPublishTime(); long endTimestamp = Instant.now().toEpochMilli(); - long gap = Math.max((endTimestamp - startTimestamp) / (parallelism - 1), 1000 * 60 * 60); - logger.info("Gap: {}, Parallelism: {}", gap, parallelism); - List tasks = new ArrayList<>(); + MutableList tasks = Lists.mutable.empty(); while (startTimestamp < endTimestamp) { tasks.add(new ReadPulsarSplit( taskContext.getTaskId(), @@ -57,14 +57,15 @@ public class ReadPulsarSource implements Source {}", covertTimestamp(split.getStartTimestamp()), covertTimestamp(split.getEndTimestamp())); } + splits = new ArrayList<>(tasks.shuffleThis()); } } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java index 63b958f..5360ca4 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java @@ -35,10 +35,10 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator message) { return new RecordView( - RecordView.Operation.SOURCE, + RecordView.Operation.QUEUE, new String(message.getValue()), FORMATTER.format(Instant.ofEpochMilli(message.getPublishTime())), message.getMessageId().toString() @@ -66,7 +67,7 @@ public class ReadPulsarSourceReader implements SourceReader output) throws Exception { if (ObjectUtil.isNotNull(currentSplit)) { - logger.info("Read split: {}", currentSplit); + logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp()); try (PulsarClient client = PulsarClient.builder() .serviceUrl(currentSplit.getPulsarUrl()) .build()) { @@ -83,7 +84,13 @@ public class ReadPulsarSourceReader implements SourceReader message : messages) { currentTimestamp = message.getPublishTime(); + } + if (currentTimestamp > currentSplit.getEndTimestamp()) { + logger.info("t{} Break for {} -> {}, Queue rest: {}", readerContext.getIndexOfSubtask(), currentTimestamp, currentSplit.getEndTimestamp(), readQueue.size()); + break; + } + for (Message message : messages) { output.collect(parsePulsarMessage(message)); } consumer.acknowledge(messages); - if (currentTimestamp > currentSplit.getEndTimestamp()) { - logger.info("Break for {} -> {}", currentTimestamp, currentSplit.getEndTimestamp()); - break; - } messages = consumer.batchReceive(); } } } } - return tryMoveToNextSplit(); - } - private InputStatus tryMoveToNextSplit() { + currentSplit = null; + if (ObjectUtil.isEmpty(readQueue) && !noMoreSplits) { + readerContext.sendSplitRequest(); + logger.info("t{} Request new split", readerContext.getIndexOfSubtask()); + } + currentSplit = readQueue.poll(); - logger.info("Current split: {}", currentSplit); + logger.info("t{} Queue rest: {}, Current split: {}", readerContext.getIndexOfSubtask(), readQueue.size(), currentSplit); if (ObjectUtil.isNotNull(currentSplit)) { return InputStatus.MORE_AVAILABLE; } else if (noMoreSplits) { @@ -134,14 +146,14 @@ public class ReadPulsarSourceReader implements SourceReader splits) { - logger.info("Add splits: {}", splits); + logger.info("t{} Add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getStartTimestamp).collect(Collectors.toList())); readQueue.addAll(splits); availability.complete(null); } @Override public void notifyNoMoreSplits() { - logger.info("No more splits for {}", readerContext.getIndexOfSubtask()); + logger.info("t{} No more splits for {}", readerContext.getIndexOfSubtask(), readerContext.getIndexOfSubtask()); noMoreSplits = true; availability.complete(null); } diff --git a/service-executor/service-executor-task/src/main/resources/logback.xml b/service-executor/service-executor-task/src/main/resources/logback.xml index eb023b6..2c343b0 100644 --- a/service-executor/service-executor-task/src/main/resources/logback.xml +++ b/service-executor/service-executor-task/src/main/resources/logback.xml @@ -3,7 +3,7 @@ INFO - http://132.126.207.125:33100/loki/api/v1/push + http://132.126.207.126:33100/loki/api/v1/push %d{yyyy-MM-dd HH:mm:ss.SSS} [${HOSTNAME}] %-5level ${PID:- } --- [%t] %-40.40logger{39} #@# : %m%n