diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index c20f2a4..7578e79 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,113 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T170822.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T170142.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T165248.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T165207.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T165114.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T164901.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T164758.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T164303.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T164220.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=08E78CE8926806AAB5D110D0FE9B05F7 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T164107.200.txt + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:33535/task/law_enforcement?pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650&pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000&primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=C5D2666661F27F68E53223FE5B74AF35 +Accept-Encoding: br,deflate,gzip,x-gzip + +<> 2024-05-28T163410.200.txt + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) @@ -368,91 +478,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java index b6c8f97..ff0eb72 100644 --- a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java @@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.command.pro.commands; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.common.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import com.lanyuanxiaoyao.service.forest.service.HudiService; import com.lanyuanxiaoyao.service.forest.service.InfoService; import java.io.IOException; @@ -23,6 +25,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -41,6 +44,7 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -206,6 +210,22 @@ public class HudiCommand { hudiService.timelineHdfsAllActive(hdfs).forEach(instant -> logger.info(instant.toString())); } + @ShellMethod("Check compaction") + public void checkCompaction(@ShellOption(help = "root hdfs path") String hdfs) { + logger.info("{}", hudiService.timelineHdfsAllActive(hdfs)); + ImmutableList finishCompactionInstants = hudiService.timelineHdfsAllActive(hdfs) + .select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)) + .select(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name())); + for (HudiInstant instant : finishCompactionInstants) { + logger.info("Compaction: {}", instant); + HudiCompactionPlan compactionPlan = hudiService.readCompactionPlanHdfs(hdfs, instant.getTimestamp()); + for (HudiCompactionPlan.Operation operation : compactionPlan.getOperations()) { + logger.info("Base: {}", operation.getDataFilePath()); + operation.getDeltaFilePaths().forEach(path -> logger.info(" Delta: {}", path)); + } + } + } + public interface Runnable { void run(LongAdder counter); } diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java index d1d0031..c2f7dd2 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java @@ -60,6 +60,30 @@ public class ExecutorTaskController { return executorTaskService.tableSummary(hdfs); } + @GetMapping("law_enforcement") + public String lawEnforcement( + @RequestParam("pulsar_url") String pulsarUrl, + @RequestParam("pulsar_topic") String pulsarTopic, + @RequestParam("start_time") Long startTime, + @RequestParam("end_time") Long endTime, + @RequestParam("primary_keys") String primaryKeys, + @RequestParam("partition_keys") String partitionKeys + ) throws Exception { + if (StrUtil.isBlank(pulsarUrl)) { + throw new RuntimeException("pulsar_url cannot be empty"); + } + if (StrUtil.isBlank(pulsarTopic)) { + throw new RuntimeException("pulsar_topic cannot be empty"); + } + if (StrUtil.isBlank(primaryKeys)) { + throw new RuntimeException("primary_keys cannot be empty"); + } + if (StrUtil.isBlank(partitionKeys)) { + throw new RuntimeException("partition_keys cannot be empty"); + } + return executorTaskService.lawEnforcement(pulsarUrl, pulsarTopic, startTime, endTime, primaryKeys, partitionKeys); + } + @GetMapping("results") public ImmutableList results( @RequestParam("task_id") String taskId, diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java index 37faa33..87b009a 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java @@ -236,6 +236,43 @@ public class ExecutorTaskService { return applicationId.toString(); } + public String lawEnforcement( + String pulsarUrl, + String pulsarTopic, + Long startTime, + Long endTime, + String primaryKeys, + String partitionKeys + ) throws Exception { + String taskId = taskId(); + Configuration configuration = generateConfiguration(taskId, StrUtil.format("law_enforcement")); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m")); + MapBuilder builder = MapUtil.builder(); + + builder.put("pulsar_url", pulsarUrl); + builder.put("pulsar_topic", pulsarTopic); + builder.put("start_time", startTime); + builder.put("end_time", endTime); + builder.put("primary_keys", primaryKeys); + builder.put("partition_keys", partitionKeys); + + ApplicationId applicationId = Runner.run( + configuration, + "com.lanyuanxiaoyao.service.executor.task.HoodiePolice", + new String[]{ + TaskConstants.TASK_CONTEXT_OPTION, + mapper.writeValueAsString( + new TaskContext( + taskId, + executorConfiguration.getTaskResultPath(), + Maps.mutable.ofMap(builder.build()) + ) + ) + } + ); + return applicationId.toString(); + } + @Cacheable(value = "results", sync = true) @Retryable(Throwable.class) public ImmutableList taskResult(String taskId, Integer limit) throws IOException { diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml index da512f2..a55195d 100644 --- a/service-executor/service-executor-task/pom.xml +++ b/service-executor/service-executor-task/pom.xml @@ -82,6 +82,11 @@ asciitable provided + + com.github.ben-manes.caffeine + caffeine + provided + diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java new file mode 100644 index 0000000..8fbddab --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java @@ -0,0 +1,61 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.ObjectUtil; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; +import com.lanyuanxiaoyao.service.executor.task.functions.police.PrisonerSink; +import com.lanyuanxiaoyao.service.executor.task.functions.police.PulsarMessage2Prisoner; +import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource; +import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.lanyuanxiaoyao.service.common.Constants.GB; +import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; + +/** + * Mini Hudi + * + * @author lanyuanxiaoyao + */ +public class HoodiePolice { + private static final Logger logger = LoggerFactory.getLogger(HoodiePolice.class); + + public static void main(String[] args) throws Exception { + TaskContext taskContext = ArgumentsHelper.getContext(args); + logger.info("Context: {}", taskContext); + + ArgumentsHelper.checkMetadata(taskContext, "pulsar_url"); + String pulsarUrl = (String) taskContext.getMetadata().get("pulsar_url"); + ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic"); + String pulsarTopic = (String) taskContext.getMetadata().get("pulsar_topic"); + ArgumentsHelper.checkMetadata(taskContext, "start_time"); + Long startTime = (Long) taskContext.getMetadata().get("start_time"); + ArgumentsHelper.checkMetadata(taskContext, "end_time"); + Long endTime = (Long) taskContext.getMetadata().get("end_time"); + ArgumentsHelper.checkMetadata(taskContext, "primary_keys"); + String[] primaryKeys = ((String) taskContext.getMetadata().get("primary_keys")).split(","); + ArgumentsHelper.checkMetadata(taskContext, "partition_keys"); + String[] partitionKeys = ((String) taskContext.getMetadata().get("partition_keys")).split(","); + + StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); + environment.setParallelism(30); + environment + .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar") + .disableChaining() + .map(new PulsarMessage2Prisoner(startTime, endTime, primaryKeys, partitionKeys)) + .filter(ObjectUtil::isNotNull) + .keyBy(Prisoner::getPartition) + .keyBy(Prisoner::getKey) + .addSink(new PrisonerSink(taskContext)); + environment.execute(); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/Prisoner.java new file mode 100644 index 0000000..dcf4ae4 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/Prisoner.java @@ -0,0 +1,75 @@ +package com.lanyuanxiaoyao.service.executor.task.entity; + +import java.io.Serializable; + +/** + * @author lanyuanxiaoyao + */ +public class Prisoner implements Serializable { + private final String messageId; + private final String type; + private final String key; + private final String partition; + private final Long timestamp; + private final Boolean error; + private final String errorMessage; + + private Prisoner(String messageId, String type, String key, String partition, Long timestamp, Boolean error, String errorMessage) { + this.messageId = messageId; + this.type = type; + this.key = key; + this.partition = partition; + this.timestamp = timestamp; + this.error = error; + this.errorMessage = errorMessage; + } + + public Prisoner(String messageId, String type, String key, String partition, Long timestamp) { + this(messageId, type, key, partition, timestamp, false, null); + } + + public Prisoner(String messageId, String errorMessage) { + this(messageId, null, "error", "error", null, true, errorMessage); + } + + public String getMessageId() { + return messageId; + } + + public String getType() { + return type; + } + + public String getKey() { + return key; + } + + public String getPartition() { + return partition; + } + + public Long getTimestamp() { + return timestamp; + } + + public Boolean getError() { + return error; + } + + public String getErrorMessage() { + return errorMessage; + } + + @Override + public String toString() { + return "Prisoner{" + + "messageId='" + messageId + '\'' + + ", type='" + type + '\'' + + ", key='" + key + '\'' + + ", partition='" + partition + '\'' + + ", timestamp=" + timestamp + + ", error=" + error + + ", errorMessage='" + errorMessage + '\'' + + '}'; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java new file mode 100644 index 0000000..23f531b --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java @@ -0,0 +1,72 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.police; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * @author lanyuanxiaoyao + */ +public class PrisonerSink extends RichSinkFunction { + private final TaskContext taskContext; + private FileSystem fileSystem; + private LoadingCache outputStreamLoadingCache; + + public PrisonerSink(TaskContext taskContext) { + this.taskContext = taskContext; + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { + super.open(parameters); + this.fileSystem = FileSystem.get(new Configuration()); + this.outputStreamLoadingCache = Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .evictionListener((RemovalListener) (key, value, cause) -> { + try { + if (value != null) { + value.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .build(path -> { + Path p = new Path(path); + if (fileSystem.exists(p)) { + return fileSystem.append(p); + } else { + return fileSystem.create(p, true); + } + }); + } + + @Override + public void close() throws Exception { + super.close(); + if (ObjectUtil.isNotNull(outputStreamLoadingCache)) { + outputStreamLoadingCache.invalidateAll(); + } + if (ObjectUtil.isNotNull(fileSystem)) { + fileSystem.close(); + } + } + + @Override + public void invoke(Prisoner value, Context context) throws Exception { + FSDataOutputStream outputStream = outputStreamLoadingCache.get(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey())); + assert outputStream != null; + outputStream.writeBytes(value.toString()); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java new file mode 100644 index 0000000..e6437c7 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PulsarMessage2Prisoner.java @@ -0,0 +1,83 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.police; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.Constants; +import com.lanyuanxiaoyao.service.common.entity.Record; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; +import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.collections.api.factory.Lists; + +/** + * @author lanyuanxiaoyao + */ +public class PulsarMessage2Prisoner extends RichMapFunction { + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); + private final ObjectMapper mapper = JacksonHelper.getMapper(); + + private final long startTime; + private final long endTime; + private final String[] primaryKeys; + private final String[] partitionKeys; + + public PulsarMessage2Prisoner(long startTime, long endTime, String[] primaryKeys, String[] partitionKeys) { + this.startTime = startTime; + this.endTime = endTime; + this.primaryKeys = primaryKeys; + this.partitionKeys = partitionKeys; + } + + @Override + public Prisoner map(RecordView value) { + Record record; + try { + record = mapper.readValue(value.getData(), Record.class); + } catch (Throwable e) { + return new Prisoner(value.getFile(), StrUtil.format("{}: {}", e.getMessage(), value.getData())); + } + Record.Statement statement = record.getStatement(); + if (ObjectUtil.isNull(statement) || StrUtil.isBlank(statement.getOpTs())) { + return new Prisoner(value.getFile(), StrUtil.format("Invalid statement: {}", value.getData())); + } + long timestamp; + try { + if (OPTS_PATTERN.matcher(statement.getOpTs()).matches()) { + timestamp = LocalDateTime.parse(statement.getOpTs(), FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); + } else { + throw new Exception("opts not match " + OPTS_PATTERN.pattern()); + } + } catch (Throwable e) { + return new Prisoner(value.getFile(), StrUtil.format("Invalid timestamp: {}", value.getData())); + } + if (timestamp < startTime || timestamp > endTime) { + return null; + } + Map fields; + if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) { + fields = record.getStatement().getAfter(); + } else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) { + fields = record.getStatement().getBefore(); + } else { + return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData())); + } + String primaryKey = Lists.immutable.of(primaryKeys).select(fields::containsKey).collect(fields::get).makeString(","); + String partitionKey = Lists.immutable.of(partitionKeys).select(fields::containsKey).collect(fields::get).makeString(","); + if (StrUtil.isBlank(primaryKey)) { + return new Prisoner(value.getFile(), StrUtil.format("Invalid primaryKey: {}", value.getData())); + } + if (StrUtil.isBlank(partitionKey)) { + return new Prisoner(value.getFile(), StrUtil.format("Invalid partitionKey: {}", value.getData())); + } + return new Prisoner(value.getFile(), record.getStatement().getOpType(), primaryKey, partitionKey, timestamp); + } +} diff --git a/test/test.http b/test/test.http index f062ee5..b8ab961 100644 --- a/test/test.http +++ b/test/test.http @@ -47,10 +47,12 @@ Content-Type: application/json GET {{queue-url}}/queue/clear?name=compaction-queue-pre ### Info -GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metrics?flink_job_id=1542097996099055616&alias=acct_acct_item_fs&filter_completes=true +GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metrics?flink_job_id=1542097996099055616& + alias=acct_acct_item_fs&filter_completes=true ### Info -GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics?search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs +GET http://{{username}}:{{password}}@132.122.116.150:27510/table/list_compaction_metrics? + search_flink_job_id=1542097996099055616&search_alias=acct_acct_item_fs ### Query Scheduler GET {{scheduler-url}}/schedule/all @@ -86,7 +88,8 @@ GET {{web-url}}/overview/sync_running_status GET {{scheduler-url}}/schedule/schedule_times ### Get message id -GET {{api-url}}/api/sync_checkpoint_state?flink_job_id=1542097996099055616&alias=acct_acct_item_zs&message_id=861976:46933:-1&publish_time=1705373846898 +GET {{api-url}}/api/sync_checkpoint_state?flink_job_id=1542097996099055616&alias=acct_acct_item_zs& + message_id=861976:46933:-1&publish_time=1705373846898 ### Test GET {{web-url}}/test @@ -95,13 +98,17 @@ GET {{web-url}}/test GET {{exporter-url}}/exporter/un_running_flink_job ### Pulsar backlog -GET http://{{username}}:{{password}}@b12s15.hdp.dc:21685/pulsar/backlog?name=main&topic=persistent://odcp/grid/grid_serv_staff&subscription=Hudi_Sync_Pulsar_Reader_1552408245762723840_grid_grid_serv_staff_b_20230425 +GET http://{{username}}:{{password}}@b12s15.hdp.dc:21685/pulsar/backlog?name=main& + topic=persistent://odcp/grid/grid_serv_staff& + subscription=Hudi_Sync_Pulsar_Reader_1552408245762723840_grid_grid_serv_staff_b_20230425 ### Test HDFS list -GET http://{{username}}:{{password}}@b12s10.hdp.dc:16695/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr +GET http://{{username}}:{{password}}@b12s10.hdp.dc:16695/hdfs/list? + root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr ### Test HDFS write -POST http://{{username}}:{{password}}@b12s8.hdp.dc:15391/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt&overwrite=true +POST http://{{username}}:{{password}}@b12s8.hdp.dc:15391/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt& + overwrite=true Content-Type: text/plain \#Properties saved on 2023-12-26T09:18:39.583Z @@ -121,3 +128,9 @@ hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator hoodie.table.timeline.timezone=LOCAL hoodie.datasource.write.hive_style_partitioning=false hoodie.table.checksum=989688289 + +### Test police +GET http://{{username}}:{{password}}@b12s10.hdp.dc:33535/task/law_enforcement? + pulsar_url=pulsar://132.122.115.158:16650,132.122.115.159:16650,132.122.115.160:16650,132.122.115.161:16650,132.122.115.167:16650,132.122.115.168:16650& + pulsar_topic=persistent://odcp/acct_dg/acct_item_760&start_time=1716858000000&end_time=1716861600000& + primary_keys=ACCT_ITEM_ID&partition_keys=ACCT_ID \ No newline at end of file diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java index c58a191..8881261 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/functions/PulsarMessage2RecordFunction.java @@ -35,7 +35,7 @@ public class PulsarMessage2RecordFunction extends RichMapFunction latestOperationTime = new AtomicReference<>(""); - private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$"); private final GlobalConfiguration globalConfiguration; private final FlinkJob flinkJob;