diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index f97b603..bcab1dc 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,103 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=F957FA9C224455994EAC8E1880C973EC +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T162824.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=F957FA9C224455994EAC8E1880C973EC +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T161759.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=87BC302EE6B883F529C227E8450A4153 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T155607.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=87BC302EE6B883F529C227E8450A4153 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T141449.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=5B43FB1FE3B1826286434211887CF8F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T140729.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=5B43FB1FE3B1826286434211887CF8F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T112612.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=5B43FB1FE3B1826286434211887CF8F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T111740.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=5B43FB1FE3B1826286434211887CF8F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T111548.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=D47906EE784653B11896CB69773F0979 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T110457.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:31719/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=D47906EE784653B11896CB69773F0979 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-06-05T094747.200.txt + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) @@ -418,103 +518,3 @@ Hello world ### -POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt -Content-Length: 11 -Content-Type: */*; charset=UTF-8 -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Cookie: JSESSIONID=21482112F88BCF63D4FE4F5D2A6681FF -Accept-Encoding: br,deflate,gzip,x-gzip - -Hello world - -### - -POST http://b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt -Content-Length: 11 -Content-Type: */*; charset=UTF-8 -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Accept-Encoding: br,deflate,gzip,x-gzip - -Hello world - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:16695/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Cookie: JSESSIONID=C23877E9843F4E9C87FC2787EC5EA701 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-04-26T172547.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@http:/b12s8.hdp.dc:33681/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-04-26T172511.503.html - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:30943/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr/.hoodie -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Cookie: JSESSIONID=C23877E9843F4E9C87FC2787EC5EA701 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-04-26T162856.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:30943/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-04-26T162825.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s15.hdp.dc:21685/pulsar/backlog?name=main&topic=persistent://odcp/grid/grid_serv_staff&subscription=Hudi_Sync_Pulsar_Reader_1552408245762723840_grid_grid_serv_staff_b_20230425 -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-03-05T111533.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service-exporter/exporter/un_running_flink_job -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=E12F3C1D9B6AE9937CA57DE2EE8656C7 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-02-04T153541.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service-exporter/exporter/un_running_flink_job -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=606B213C2F4A113AC3CCC2A0614BB558 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-02-04T152955.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service-exporter/exporter/un_running_flink_job -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=5569F2E918CF60BB8B439404BCD2255A -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-02-04T152612.200.json - -### - diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java index 17437d1..dacd9f7 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java @@ -48,12 +48,6 @@ public class HoodiePolice { .filter(ObjectUtil::isNotNull) .keyBy(Prisoner::getPartition) .keyBy(Prisoner::getKey) - .reduce(new RichReduceFunction() { - @Override - public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception { - return null; - } - }) .addSink(new PrisonerSink(taskContext)); environment.execute(); } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java index 359c540..851247a 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java @@ -60,7 +60,7 @@ public class ReadPulsarSource implements Source context; private final Queue readQueue; + private final Set runningTasks = Sets.mutable.empty().asSynchronized(); public ReadPulsarSourceEnumerator(SplitEnumeratorContext context, Collection splits) { this.context = context; @@ -31,15 +39,40 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator handleSourceEvent(info.getSubtaskId(), null)); + } else if (sourceEvent instanceof EndEvent) { + EndEvent event = (EndEvent) sourceEvent; + logger.info("t{} End: {}", subtaskId, event.getSplit()); + if (ObjectUtil.isNotNull(event.getSplit())) { + runningTasks.remove(event.getSplit().getSplitId()); + } + logger.info("t{} Queue: {} running tasks: {}", subtaskId, readQueue.size(), runningTasks); + if (ObjectUtil.isEmpty(readQueue) && ObjectUtil.isEmpty(runningTasks)) { + logger.info("t{} No more", subtaskId); + context.registeredReaders() + .values() + .forEach(info -> context.signalNoMoreSplits(info.getSubtaskId())); + } + } + } + @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { final ReadPulsarSplit split = readQueue.poll(); if (ObjectUtil.isNotNull(split)) { logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size()); context.assignSplit(split, subtaskId); - } else { - logger.info("t{} No more split for {}", subtaskId, subtaskId); - context.signalNoMoreSplits(subtaskId); + runningTasks.add(split.getSplitId()); } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java index 25dd5c1..c1dcbfd 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java @@ -1,9 +1,14 @@ package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.AddEvent; +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.EndEvent; +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.StartEvent; +import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper; import java.io.Serializable; import java.time.Instant; import java.time.ZoneId; @@ -19,14 +24,12 @@ import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.StringSchema; +import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +54,9 @@ public class ReadPulsarSourceReader implements SourceReader message) { @@ -65,6 +71,7 @@ public class ReadPulsarSourceReader implements SourceReader output) throws Exception { logger.info("t{} Poll Next", readerContext.getIndexOfSubtask()); + readerContext.sendSourceEventToCoordinator(new StartEvent()); if (ObjectUtil.isNotNull(currentSplit)) { logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getSplitId()); long startTimestamp = currentSplit.getStartTimestamp(); @@ -86,6 +93,7 @@ public class ReadPulsarSourceReader implements SourceReader message = reader.readNext(10, TimeUnit.SECONDS); while (ObjectUtil.isNotNull(message)) { long publishTime = message.getPublishTime(); @@ -93,11 +101,32 @@ public class ReadPulsarSourceReader implements SourceReader {}, Queue rest: {}", readerContext.getIndexOfSubtask(), message.getPublishTime(), currentSplit.getEndTimestamp(), readQueue.size()); break; } + if (++count > 500000) { + ImmutableList range = TimeRangeHelper.range(publishTime, endTimestamp, currentSplit.getGap() / 5); + if (ObjectUtil.isNotEmpty(range)) { + readerContext.sendSourceEventToCoordinator(new AddEvent( + range.collect(r -> new ReadPulsarSplit( + currentSplit.getTaskId(), + IdUtil.nanoId(10), + currentSplit.getPulsarUrl(), + currentSplit.getPulsarTopic(), + currentSplit.getLatestMessageId(), + r.getStart(), + r.getEnd(), + r.getGap() + )).toList() + )); + break; + } else { + count = 0; + } + } output.collect(parsePulsarMessage(message)); message = reader.readNext(10, TimeUnit.SECONDS); } } } + readerContext.sendSourceEventToCoordinator(new EndEvent(currentSplit)); } currentSplit = null; diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddEvent.java new file mode 100644 index 0000000..3f1b42a --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddEvent.java @@ -0,0 +1,28 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; + +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit; +import java.util.List; +import org.apache.flink.api.connector.source.SourceEvent; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @author lanyuanxiaoyao + */ +public class AddEvent implements SourceEvent { + private final List splits; + + public AddEvent(List splits) { + this.splits = splits; + } + + public List getSplits() { + return splits; + } + + @Override + public String toString() { + return "AddSplitEvent{" + + "splits=" + splits + + '}'; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/EndEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/EndEvent.java new file mode 100644 index 0000000..49d2c45 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/EndEvent.java @@ -0,0 +1,26 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; + +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit; +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * @author lanyuanxiaoyao + */ +public class EndEvent implements SourceEvent { + private final ReadPulsarSplit split; + + public EndEvent(ReadPulsarSplit split) { + this.split = split; + } + + public ReadPulsarSplit getSplit() { + return split; + } + + @Override + public String toString() { + return "EndEvent{" + + "split=" + split + + '}'; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/StartEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/StartEvent.java new file mode 100644 index 0000000..be0735c --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/StartEvent.java @@ -0,0 +1,9 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * @author lanyuanxiaoyao + */ +public class StartEvent implements SourceEvent { +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java index c549635..064e73b 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java @@ -12,10 +12,12 @@ public class TimeRangeHelper { public static final class TimeRange { private final long start; private final long end; + private final long gap; - public TimeRange(long start, long end) { + public TimeRange(long start, long end, long gap) { this.start = start; this.end = end; + this.gap = gap; } public long getStart() { @@ -25,13 +27,19 @@ public class TimeRangeHelper { public long getEnd() { return end; } + + public long getGap() { + return gap; + } } public static ImmutableList range(long start, long end, long gap) { - gap = Math.max(TimeUnit.MINUTES.toMillis(1), gap); + if (gap < TimeUnit.MINUTES.toMillis(5)) { + return Lists.immutable.empty(); + } MutableList ranges = Lists.mutable.empty(); while (start <= end) { - ranges.add(new TimeRange(start, Math.min(end, start + gap))); + ranges.add(new TimeRange(start, Math.min(end, start + gap), gap)); start += gap; } return ranges.toImmutable(); diff --git a/test/test.http b/test/test.http index 0db7832..486b02c 100644 --- a/test/test.http +++ b/test/test.http @@ -130,7 +130,7 @@ hoodie.datasource.write.hive_style_partitioning=false hoodie.table.checksum=989688289 ### Test police -GET http://{{username}}:{{password}}@b12s10.hdp.dc:33535/task/law_enforcement? +GET http://{{username}}:{{password}}@b12s10.hdp.dc:31719/task/law_enforcement? pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650& pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000& primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID \ No newline at end of file