From 633db5512dacc1e215b116a4c06afc666e4b95ec Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Thu, 30 May 2024 15:09:54 +0800 Subject: [PATCH] =?UTF-8?q?refactor(executor-task):=20=E6=81=A2=E5=A4=8Dpu?= =?UTF-8?q?lsar=E8=AF=BB=E5=8F=96=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/httpRequests/http-requests-log.http | 380 ++++++++++-------- .../functions/pulsar/ReadPulsarSource.java | 9 +- .../pulsar/ReadPulsarSourceEnumerator.java | 15 +- .../pulsar/ReadPulsarSourceReader.java | 28 +- .../functions/pulsar/ReadPulsarSplit.java | 26 +- .../functions/pulsar/event/AddSplitEvent.java | 27 -- .../pulsar/event/FinishSplitEvent.java | 9 - .../executor/task/helper/TimeRangeHelper.java | 2 + .../service/executor/task/TimeRangeTest.java | 21 + test/test.http | 2 +- 10 files changed, 276 insertions(+), 243 deletions(-) delete mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddSplitEvent.java delete mode 100644 service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java create mode 100644 service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/TimeRangeTest.java diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index 7578e79..f97b603 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,213 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=12F049741038C7209A22DF1B31F9FFD6 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-30T150503.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=12F049741038C7209A22DF1B31F9FFD6 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-30T145617.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=55CFED44314E337BB8C3BBB2F0992FEB +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-30T142948.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=55CFED44314E337BB8C3BBB2F0992FEB +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-30T104355.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A8AABEBD11DED7A2C0182E3B5157D6E8 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-30T102423.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A8AABEBD11DED7A2C0182E3B5157D6E8 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T185454.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=6C26FD8B968B60DBAB0A9CE98225F2F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T183751.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=6C26FD8B968B60DBAB0A9CE98225F2F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T174644.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=6C26FD8B968B60DBAB0A9CE98225F2F3 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T174414.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=3E817DF91AA257B68B5B86F426B29BB7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T172634.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=3E817DF91AA257B68B5B86F426B29BB7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T165136.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=3E817DF91AA257B68B5B86F426B29BB7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T164417.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=3E817DF91AA257B68B5B86F426B29BB7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T162750.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=3E817DF91AA257B68B5B86F426B29BB7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T162011.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T160815.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T095011.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T093555.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T093240.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T093118.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=A0F89DE4D132FAFAE119965445039F4A +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T092602.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-29T085636.200.txt + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) @@ -308,173 +518,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service-exporter/exporter/un_running_flink_job -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-02-04T151808.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java index 8c3a8dc..359c540 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSource.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.executor.task.functions.pulsar; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; @@ -53,11 +54,13 @@ public class ReadPulsarSource implements Source tasks = TimeRangeHelper.range(startTimestamp, endTimestamp, TASK_GAP) .collect(range -> new ReadPulsarSplit( taskContext.getTaskId(), + IdUtil.nanoId(10), pulsarUrl, pulsarTopic, latestMessageId.toString(), range.getStart(), - range.getEnd() + range.getEnd(), + TASK_GAP )); logger.info("Gap: {}, Splits: {}", TASK_GAP, tasks.size()); for (ReadPulsarSplit split : tasks) { @@ -78,12 +81,12 @@ public class ReadPulsarSource implements Source createReader(SourceReaderContext readerContext) throws PulsarClientException { + public SourceReader createReader(SourceReaderContext readerContext) { return new ReadPulsarSourceReader(readerContext); } @Override - public SplitEnumerator> createEnumerator(SplitEnumeratorContext enumContext) throws Exception { + public SplitEnumerator> createEnumerator(SplitEnumeratorContext enumContext) { return new ReadPulsarSourceEnumerator(enumContext, splits); } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java index ee68971..5360ca4 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceEnumerator.java @@ -7,11 +7,7 @@ import java.util.ArrayDeque; import java.util.Collection; import java.util.List; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; - -import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.slf4j.Logger; @@ -25,30 +21,21 @@ public class ReadPulsarSourceEnumerator implements SplitEnumerator context; private final Queue readQueue; - private final AtomicInteger success = new AtomicInteger(0); public ReadPulsarSourceEnumerator(SplitEnumeratorContext context, Collection splits) { this.context = context; this.readQueue = new ArrayDeque<>(splits); - this.success.set(splits.size()); } @Override public void start() { } - @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - if (sourceEvent instanceof FinishSplitEvent) { - logger.info("{}", success.decrementAndGet()); - } - } - @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { final ReadPulsarSplit split = readQueue.poll(); if (ObjectUtil.isNotNull(split)) { - logger.info("t{} Assign split for {}, Queue rest: {}, Success: {}", subtaskId, subtaskId, readQueue.size(), success.get()); + logger.info("t{} Assign split for {}, Queue rest: {}", subtaskId, subtaskId, readQueue.size()); context.assignSplit(split, subtaskId); } else { logger.info("t{} No more split for {}", subtaskId, subtaskId); diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java index 6182fdf..25dd5c1 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSourceReader.java @@ -15,15 +15,18 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - -import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event.FinishSplitEvent; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.StringSchema; -import org.apache.pulsar.client.internal.DefaultImplementation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,20 +45,12 @@ public class ReadPulsarSourceReader implements SourceReader message) { @@ -71,7 +66,7 @@ public class ReadPulsarSourceReader implements SourceReader output) throws Exception { logger.info("t{} Poll Next", readerContext.getIndexOfSubtask()); if (ObjectUtil.isNotNull(currentSplit)) { - logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getStartTimestamp()); + logger.info("t{} Read split: {}", readerContext.getIndexOfSubtask(), currentSplit.getSplitId()); long startTimestamp = currentSplit.getStartTimestamp(); long endTimestamp = currentSplit.getEndTimestamp(); try (PulsarClient client = PulsarClient.builder() @@ -106,7 +101,6 @@ public class ReadPulsarSourceReader implements SourceReader splits) { - logger.info("t{} Add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getStartTimestamp).collect(Collectors.toList())); + logger.info("t{} Receive add splits: {}", readerContext.getIndexOfSubtask(), splits.stream().map(ReadPulsarSplit::getSplitId).collect(Collectors.toList())); readQueue.addAll(splits); availability.complete(null); } @Override public void notifyNoMoreSplits() { - logger.info("t{} No more splits for {}", readerContext.getIndexOfSubtask(), readerContext.getIndexOfSubtask()); + logger.info("t{} Receive no more splits", readerContext.getIndexOfSubtask()); noMoreSplits = true; availability.complete(null); } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java index 765ac02..b7337b3 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/ReadPulsarSplit.java @@ -9,22 +9,26 @@ import org.apache.flink.api.connector.source.SourceSplit; */ public class ReadPulsarSplit implements SourceSplit, Serializable { private String taskId; + private String splitId; private String pulsarUrl; private String pulsarTopic; private String latestMessageId; private Long startTimestamp; private Long endTimestamp; + private Long gap; public ReadPulsarSplit() { } - public ReadPulsarSplit(String taskId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp) { + public ReadPulsarSplit(String taskId, String splitId, String pulsarUrl, String pulsarTopic, String latestMessageId, Long startTimestamp, Long endTimestamp, Long gap) { this.taskId = taskId; + this.splitId = splitId; this.pulsarUrl = pulsarUrl; this.pulsarTopic = pulsarTopic; this.latestMessageId = latestMessageId; this.startTimestamp = startTimestamp; this.endTimestamp = endTimestamp; + this.gap = gap; } public String getTaskId() { @@ -35,6 +39,14 @@ public class ReadPulsarSplit implements SourceSplit, Serializable { this.taskId = taskId; } + public String getSplitId() { + return splitId; + } + + public void setSplitId(String splitId) { + this.splitId = splitId; + } + public String getPulsarUrl() { return pulsarUrl; } @@ -75,20 +87,30 @@ public class ReadPulsarSplit implements SourceSplit, Serializable { this.endTimestamp = endTimestamp; } + public Long getGap() { + return gap; + } + + public void setGap(Long gap) { + this.gap = gap; + } + @Override public String splitId() { - return taskId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId; + return taskId + splitId + pulsarUrl + pulsarTopic + startTimestamp + endTimestamp + latestMessageId; } @Override public String toString() { return "ReadPulsarSplit{" + "taskId='" + taskId + '\'' + + ", splitId='" + splitId + '\'' + ", pulsarUrl='" + pulsarUrl + '\'' + ", pulsarTopic='" + pulsarTopic + '\'' + ", latestMessageId='" + latestMessageId + '\'' + ", startTimestamp=" + startTimestamp + ", endTimestamp=" + endTimestamp + + ", gap=" + gap + '}'; } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddSplitEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddSplitEvent.java deleted file mode 100644 index 392cb11..0000000 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/AddSplitEvent.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; - -import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSplit; -import org.apache.flink.api.connector.source.SourceEvent; -import org.eclipse.collections.api.list.ImmutableList; - -/** - * @author lanyuanxiaoyao - */ -public class AddSplitEvent implements SourceEvent { - private final ImmutableList splits; - - public AddSplitEvent(ImmutableList splits) { - this.splits = splits; - } - - public ImmutableList getSplits() { - return splits; - } - - @Override - public String toString() { - return "AddSplitEvent{" + - "splits=" + splits + - '}'; - } -} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java deleted file mode 100644 index 4351faf..0000000 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/pulsar/event/FinishSplitEvent.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.lanyuanxiaoyao.service.executor.task.functions.pulsar.event; - -import org.apache.flink.api.connector.source.SourceEvent; - -/** - * @author lanyuanxiaoyao - */ -public class FinishSplitEvent implements SourceEvent { -} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java index 92261a2..c549635 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/TimeRangeHelper.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.executor.task.helper; +import java.util.concurrent.TimeUnit; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.MutableList; @@ -27,6 +28,7 @@ public class TimeRangeHelper { } public static ImmutableList range(long start, long end, long gap) { + gap = Math.max(TimeUnit.MINUTES.toMillis(1), gap); MutableList ranges = Lists.mutable.empty(); while (start <= end) { ranges.add(new TimeRange(start, Math.min(end, start + gap))); diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/TimeRangeTest.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/TimeRangeTest.java new file mode 100644 index 0000000..765a6e4 --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/TimeRangeTest.java @@ -0,0 +1,21 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import com.lanyuanxiaoyao.service.executor.task.helper.TimeRangeHelper; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +/** + * @author lanyuanxiaoyao + */ +public class TimeRangeTest { + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withLocale(Locale.CHINA) + .withZone(ZoneId.systemDefault()); + public static void main(String[] args) { + TimeRangeHelper.range(1716912000000L, Instant.now().toEpochMilli(), TimeUnit.MINUTES.toMillis(30)) + .forEach(range -> System.out.printf("%s - %s\n", FORMATTER.format(Instant.ofEpochMilli(range.getStart())), FORMATTER.format(Instant.ofEpochMilli(range.getEnd())))); + } +} diff --git a/test/test.http b/test/test.http index b8ab961..0db7832 100644 --- a/test/test.http +++ b/test/test.http @@ -132,5 +132,5 @@ hoodie.table.checksum=989688289 ### Test police GET http://{{username}}:{{password}}@b12s10.hdp.dc:33535/task/law_enforcement? pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650& - pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000& + pulsar_topic=persistent://odcp/acct_sz/acct_item_755&start_time=1716858000000&end_time=1716861600000& primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID \ No newline at end of file