diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index b6f5c80..457b7af 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,83 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=B7901EEEC95E299E2F94FB74E3A979F5 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=BFE3689E5BBBBFD448B87BFEE0D23E83 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) +Cookie: JSESSIONID=880932A3A6DBB5983CB032B005EA2B3E +Accept-Encoding: br,deflate,gzip,x-gzip + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) @@ -382,102 +462,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=3FB05EC585838B1A8DC11D0425515571 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T143301.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=3FB05EC585838B1A8DC11D0425515571 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T143253.503.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=3FB05EC585838B1A8DC11D0425515571 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T143252.503.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=7602F2E53C5FE9BD0182453EBE62D056 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T142946.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=7602F2E53C5FE9BD0182453EBE62D056 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T142928.503.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=7602F2E53C5FE9BD0182453EBE62D056 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T142657.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/schedule_times -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2024-01-15T142605.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.152:37496/zookeeper/get_data?path=/hudi/lock/running/sync/sync_lock_1542097983881048064 -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.13 (Java/17.0.5) -Cookie: JSESSIONID=9B2804B24676C18ABB793E669D789275 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2023-05-14T000645.200.txt - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.152:37496/zookeeper/exists_path?path=/hudi/lock/running/sync/sync_lock_1542097983881048064 -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.13 (Java/17.0.5) -Cookie: JSESSIONID=9B2804B24676C18ABB793E669D789275 -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2023-05-14T000637.200.json - -### - -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.152:37496/zookeeper/exists_path?path=/hudi -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.13 (Java/17.0.5) -Cookie: JSESSIONID=62C1CD6C50E4C9B2B521DA398F17A0AD -Accept-Encoding: br,deflate,gzip,x-gzip - -<> 2023-05-14T000603.200.json - -### - diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java index 3ce8b0d..9a21e27 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/controller/ExecutorTaskController.java @@ -51,6 +51,14 @@ public class ExecutorTaskController { return executorTaskService.scanAvro(key, hdfs, pulsar, pulsarTopic, scanSource, scanQueue, scanLog, scanBase, scanTarget); } + @GetMapping("latest_op_ts") + public String latestOpTs(@RequestParam("hdfs") String hdfs) throws Exception { + if (StrUtil.isBlank(hdfs)) { + throw new RuntimeException("Hdfs path cannot be empty"); + } + return executorTaskService.scanLatestOpTs(hdfs); + } + @GetMapping("results") public ImmutableList results( @RequestParam("task_id") String taskId, diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java index 4611a9f..8528412 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/ExecutorTaskService.java @@ -161,6 +161,31 @@ public class ExecutorTaskService { return applicationId.toString(); } + public String scanLatestOpTs(String hdfs) throws Exception { + String taskId = taskId(); + Configuration configuration = generateConfiguration(taskId, "latest_op_ts"); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m")); + MapBuilder builder = MapUtil.builder(); + + builder.put("hdfs", hdfs); + + ApplicationId applicationId = Runner.run( + configuration, + "com.lanyuanxiaoyao.service.executor.task.LatestOperationTimeScan", + new String[]{ + TaskConstants.TASK_CONTEXT_OPTION, + mapper.writeValueAsString( + new TaskContext( + taskId, + executorConfiguration.getTaskResultPath(), + Maps.mutable.ofMap(builder.build()) + ) + ) + } + ); + return applicationId.toString(); + } + @Cacheable(value = "results", sync = true) @Retryable(Throwable.class) public ImmutableList taskResult(String taskId, Integer limit) throws IOException { diff --git a/service-executor/service-executor-manager/src/main/resources/logback-spring.xml b/service-executor/service-executor-manager/src/main/resources/logback-spring.xml index f272f36..6d106b6 100644 --- a/service-executor/service-executor-manager/src/main/resources/logback-spring.xml +++ b/service-executor/service-executor-manager/src/main/resources/logback-spring.xml @@ -41,9 +41,6 @@ - - - diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 83dc3b1..561ded0 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -2,25 +2,19 @@ package com.lanyuanxiaoyao.service.executor.task; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile; import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource; import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; -import java.io.IOException; +import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; import java.util.Map; -import java.util.Optional; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,36 +28,6 @@ import org.slf4j.LoggerFactory; public class DataScanner { private static final Logger logger = LoggerFactory.getLogger(DataScanner.class); - private static ImmutableList parsePaths(FileSystem fileSystem, ImmutableList paths) { - return paths - .asParallel(ExecutorProvider.EXECUTORS, 1) - .reject(path -> { - try { - return FSUtils.getFileSize(fileSystem, path) < 1; - } catch (IOException e) { - logger.error("Get file size error", e); - } - return true; - }) - .groupBy(FSUtils::getFileIdFromFilePath) - .multiValuesView() - .collect(pathList -> pathList - .toSortedListBy(path -> { - String commitTime = FSUtils.getCommitTime(path.getName()); - try { - return Long.valueOf(commitTime); - } catch (Throwable throwable) { - return 0L; - } - }) - .getLastOptional()) - .select(Optional::isPresent) - .collect(Optional::get) - .collect(Path::toString) - .toList() - .toImmutable(); - } - public static void main(String[] args) throws Exception { TaskContext taskContext = ArgumentsHelper.getContext(args); logger.info("Context: {}", taskContext); @@ -102,30 +66,13 @@ public class DataScanner { if (scanLog || scanBase) { ArgumentsHelper.checkMetadata(taskContext, "hdfs"); String hdfs = (String) metadata.get("hdfs"); - Configuration configuration = new Configuration(); - FileSystem fileSystem = FileSystem.get(configuration); - if (!fileSystem.exists(new Path(hdfs))) { - throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs)); - } + FileSystem fileSystem = FileSystem.get(new Configuration()); + HdfsHelper.checkHdfsPath(fileSystem, hdfs); - ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) - .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) - .flatCollect(status -> { - try { - if (status.isDirectory()) { - return Lists.immutable.of(fileSystem.listStatus(status.getPath())); - } else { - return Lists.immutable.of(status); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .collect(FileStatus::getPath); if (scanLog) { logger.info("Scan log hdfs: {}", hdfs); - ImmutableList logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString); - int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100)); + ImmutableList logPaths = HdfsHelper.logPaths(fileSystem, hdfs); + int parallelism = HdfsHelper.logScanParallelismPredict(logPaths); totalParallelism = Math.max(totalParallelism, parallelism); DataStream stream = environment .fromCollection(logPaths.toList()) @@ -141,11 +88,11 @@ public class DataScanner { } if (scanBase) { logger.info("Scan base hdfs: {}", hdfs); - ImmutableList dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile)); - int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500)); + ImmutableList basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs); + int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths); totalParallelism = Math.max(totalParallelism, parallelism); DataStream stream = environment - .fromCollection(dataPaths.toList()) + .fromCollection(basePaths.toList()) .name("Read base paths") .flatMap(new ReadHudiFile()) .name("Read hudi file") diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java new file mode 100644 index 0000000..70f2b69 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/LatestOperationTimeScan.java @@ -0,0 +1,76 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile; +import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 查找base文件最后的opts + * + * @author lanyuanxiaoyao + * @date 2024-01-22 + */ +public class LatestOperationTimeScan { + private static final Logger logger = LoggerFactory.getLogger(LatestOperationTimeScan.class); + + public static void main(String[] args) throws Exception { + TaskContext taskContext = ArgumentsHelper.getContext(args); + logger.info("Context: {}", taskContext); + + ArgumentsHelper.checkMetadata(taskContext, "hdfs"); + String hdfs = (String) taskContext.getMetadata().get("hdfs"); + + FileSystem fileSystem = FileSystem.get(new Configuration()); + HdfsHelper.checkHdfsPath(fileSystem, hdfs); + ImmutableList basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs); + + int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths); + + StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); + environment.setParallelism(parallelism); + String maxValue = "0"; + try (CloseableIterator iterator = environment + .fromCollection(basePaths.toList()) + .name("Read base paths") + .flatMap(new ReadHudiFile()) + .name("Read hudi file") + .setParallelism(parallelism) + .map(view -> (String) view.getAttributes().getOrDefault(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "0")) + .map(time -> new Tuple2<>(RandomUtil.randomInt(parallelism), time), TypeInformation.of(new TypeHint>() { + })) + .keyBy(tuple -> tuple.f0) + .reduce(new RichReduceFunction>() { + @Override + public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { + return value1.f1.compareTo(value2.f1) > 0 ? value1 : value2; + } + }) + .map(tuple -> tuple.f1) + /*.sinkTo(FlinkHelper.createFileSink(taskContext))*/ + .executeAndCollect("Find latest opts")) { + while (iterator.hasNext()) { + String item = iterator.next(); + if (item.compareTo(maxValue) > 0) { + maxValue = item; + } + } + } + HdfsHelper.createResult(fileSystem, taskContext, StrUtil.trim(maxValue)); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java index 642062b..223332e 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.executor.task.functions; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.Constants; import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import java.io.IOException; @@ -60,7 +61,9 @@ public class ReadHudiFile implements FlatMapFunction { String data = builder.toString(); RecordView recordView = new RecordView(operation, data, timestamp, source); - recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs); + if (StrUtil.isNotBlank(latestOpTs)) { + recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs); + } return recordView; } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java index f81713c..e74dd6a 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java @@ -28,7 +28,7 @@ public class FlinkHelper { public static StreamExecutionEnvironment getBatchEnvironment() { StreamExecutionEnvironment environment = getSteamEnvironment(); - environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); + environment.setRuntimeMode(RuntimeExecutionMode.BATCH); return environment; } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java new file mode 100644 index 0000000..0df8e1c --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/HdfsHelper.java @@ -0,0 +1,131 @@ +package com.lanyuanxiaoyao.service.executor.task.helper; + +import cn.hutool.core.collection.IterUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Predicate; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HDFS工具 + * + * @author lanyuanxiaoyao + * @date 2024-01-22 + */ +public class HdfsHelper { + private static final Logger logger = LoggerFactory.getLogger(HdfsHelper.class); + + public static void checkHdfsPath(FileSystem fileSystem, String path) throws IOException { + checkHdfsPath(fileSystem, new Path(path)); + } + + public static void checkHdfsPath(FileSystem fileSystem, Path path) throws IOException { + if (!fileSystem.exists(path)) { + throw new RuntimeException(StrUtil.format("HDFS {} is not exists", path.toString())); + } + } + + public static Integer logScanParallelismPredict(Iterable list) { + return logScanParallelismPredict(IterUtil.size(list)); + } + + public static Integer logScanParallelismPredict(Integer pathNum) { + return Math.max(1, Math.min(pathNum / 20, 100)); + } + + public static Integer baseScanParallelismPredict(Iterable list) { + return baseScanParallelismPredict(IterUtil.size(list)); + } + + public static Integer baseScanParallelismPredict(Integer pathNum) { + return Math.max(1, Math.min(pathNum / 2, 500)); + } + + public static ImmutableList latestBasePaths(FileSystem fileSystem, String root) throws IOException { + return latestBasePaths(fileSystem, new Path(root)); + } + + public static ImmutableList latestBasePaths(FileSystem fileSystem, Path root) throws IOException { + return basePaths(fileSystem, root) + .asParallel(ExecutorProvider.EXECUTORS, 1) + .collect(Path::new) + .reject(path -> { + try { + return FSUtils.getFileSize(fileSystem, path) < 1; + } catch (IOException e) { + logger.error("Get file size error", e); + } + return true; + }) + .groupBy(FSUtils::getFileIdFromFilePath) + .multiValuesView() + .collect(pathList -> pathList + .toSortedListBy(path -> { + String commitTime = FSUtils.getCommitTime(path.getName()); + try { + return Long.valueOf(commitTime); + } catch (Throwable throwable) { + return 0L; + } + }) + .getLastOptional()) + .select(Optional::isPresent) + .collect(Optional::get) + .collect(Path::toString) + .toList() + .toImmutable(); + } + + public static ImmutableList basePaths(FileSystem fileSystem, String root) throws IOException { + return basePaths(fileSystem, new Path(root)); + } + + public static ImmutableList basePaths(FileSystem fileSystem, Path root) throws IOException { + return hdfsPaths(fileSystem, root, FSUtils::isBaseFile); + } + + public static ImmutableList logPaths(FileSystem fileSystem, String root) throws IOException { + return logPaths(fileSystem, new Path(root)); + } + + public static ImmutableList logPaths(FileSystem fileSystem, Path root) throws IOException { + return hdfsPaths(fileSystem, root, FSUtils::isLogFile); + } + + public static ImmutableList hdfsPaths(FileSystem fileSystem, Path root, Predicate check) throws IOException { + return Lists.immutable.of(fileSystem.listStatus(root)) + .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) + .flatCollect(status -> { + try { + if (status.isDirectory()) { + return Lists.immutable.of(fileSystem.listStatus(status.getPath())); + } else { + return Lists.immutable.of(status); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(FileStatus::getPath) + .select(check::test) + .collect(Path::toString); + } + + public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException { + Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result"); + try(FSDataOutputStream outputStream = fileSystem.create(resultPath)) { + outputStream.writeUTF(result); + } + } +} diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/HashcodeTest.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/HashcodeTest.java new file mode 100644 index 0000000..35d868b --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/HashcodeTest.java @@ -0,0 +1,11 @@ +package com.lanyuanxiaoyao.service.executor.task; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-30 + */ +public class HashcodeTest { + public static void main(String[] args) { + System.out.println("hello world".hashCode() % 180); + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java index cb81080..5d8dd03 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/TaskService.java @@ -24,6 +24,9 @@ public interface TaskService { @Query("scan_base") Boolean scanBase ); + @Get(value = "/task/latest_op_ts", readTimeout = 2 * 60 * 1000) + String latestOpTs(@Query("hdfs") String hdfs); + @Get("/task/results") ImmutableList results(@Query("task_id") String taskId); diff --git a/service-web/src/main/resources/static/components/task-tab.js b/service-web/src/main/resources/static/components/task-tab.js index f656509..e6313ae 100644 --- a/service-web/src/main/resources/static/components/task-tab.js +++ b/service-web/src/main/resources/static/components/task-tab.js @@ -84,6 +84,35 @@ function taskTab() { } ] }, + { + type: 'form', + title: '检索最后操作时间', + actions: [ + { + type: 'submit', + label: '提交任务', + actionType: 'ajax', + api: { + method: 'get', + url: '${base}/task/latest_op_ts', + data: { + hdfs: '${hdfs|default:undefined}', + } + } + }, + ], + body: [ + { + type: 'input-text', + name: 'hdfs', + label: 'HDFS路经', + required: true, + clearable: true, + description: '输入表HDFS路径', + autoComplete: '${base}/table/all_hdfs?key=$term', + }, + ] + }, { type: 'crud', api: { diff --git a/test/test.http b/test/test.http index 19aed9b..efcc68f 100644 --- a/test/test.http +++ b/test/test.http @@ -43,7 +43,7 @@ Content-Type: application/json ] ### 清空队列 -GET http://{{username}}:{{password}}@b12s25.hdp.dc:26625/queue/clear/compaction-queue-pre +GET http://{{username}}:{{password}}@b12s25.hdp.dc:36781/queue/clear/compaction-queue-pre ### Info GET http://{{username}}:{{password}}@132.122.116.146:18166/info/compaction_metrics?flink_job_id=1542097996099055616&alias=acct_acct_item_fs&filter_completes=true