diff --git a/bin/build-executor-manager.sh b/bin/build-executor-manager.sh index 9f9f689..8d4d232 100755 --- a/bin/build-executor-manager.sh +++ b/bin/build-executor-manager.sh @@ -1,4 +1,4 @@ #!/bin/bash mvn -pl service-dependencies,service-configuration,service-forest,service-executor,service-executor/service-executor-core clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml -mvn -pl service-executor/service-executor-manager clean package -D skipTests -s ~/.m2/settings-development.xml -P b2e1 +mvn -pl service-executor/service-executor-manager clean package -D skipTests -s ~/.m2/settings-development.xml -P b2b12 ytp-transfer2 /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-executor/service-executor-manager/target/service-executor-manager-1.0.0-SNAPSHOT.jar \ No newline at end of file diff --git a/pom.xml b/pom.xml index 114b17a..47cd8cf 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ 2.11 0.12.0-eshore-SNAPSHOT 1.13.3 + 1.13.3 diff --git a/service-cli/service-cli-runner/src/main/resources/application-b12.yml b/service-cli/service-cli-runner/src/main/resources/application-b12.yml index c80afe6..acc5f16 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b12.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b12.yml @@ -14,6 +14,11 @@ deploy: app-hdfs-path: hdfs://b2/apps/datalake/jars/app-b12 archive-hdfs-path: hdfs://b2/apps/datalake/flink/completed-jobs-hudi victoria-push-url: http://132.126.207.125:35710/api/v1/import/prometheus + executor: + staging-path: hdfs://b2/apps/datalake/yarn + archive-hdfs-path: hdfs://b2/apps/flink/completed-jobs/ + task-jar-path: hdfs://b2/apps/datalake/jars/service/service-executor-task-1.0.0-SNAPSHOT.jar + task-result-path: hdfs://b2/apps/datalake/task-results security: authority: ENC(GXKnbq1LS11U2HaONspvH+D/TkIx13aWTaokdkzaF7HSvq6Z0Rv1+JUWFnYopVXu) username: ENC(moIO5mO39V1Z+RDwROK9JXY4GfM8ZjDgM6Si7wRZ1MPVjbhTpmLz3lz28rAiw7c2LeCmizfJzHkEXIwGlB280g==) diff --git a/service-cli/service-cli-runner/src/main/resources/application-b5.yml b/service-cli/service-cli-runner/src/main/resources/application-b5.yml index 01c3813..7fa1060 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-b5.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-b5.yml @@ -14,6 +14,11 @@ deploy: app-hdfs-path: hdfs://b2/apps/datalake/jars/app archive-hdfs-path: hdfs://b2/apps/datalake/flink/completed-jobs-hudi victoria-push-url: http://132.122.116.142:35710/api/v1/import/prometheus + executor: + staging-path: hdfs://b2/apps/datalake/yarn + archive-hdfs-path: hdfs://b2/apps/flink/completed-jobs/ + task-jar-path: hdfs://b2/apps/datalake/jars/service/service-executor-task-1.0.0-SNAPSHOT.jar + task-result-path: hdfs://b2/apps/datalake/task-results security: authority: ENC(GXKnbq1LS11U2HaONspvH+D/TkIx13aWTaokdkzaF7HSvq6Z0Rv1+JUWFnYopVXu) username: ENC(moIO5mO39V1Z+RDwROK9JXY4GfM8ZjDgM6Si7wRZ1MPVjbhTpmLz3lz28rAiw7c2LeCmizfJzHkEXIwGlB280g==) diff --git a/service-cli/service-cli-runner/src/main/resources/application-t5.yml b/service-cli/service-cli-runner/src/main/resources/application-t5.yml index 36c1a6d..d3312e9 100644 --- a/service-cli/service-cli-runner/src/main/resources/application-t5.yml +++ b/service-cli/service-cli-runner/src/main/resources/application-t5.yml @@ -14,6 +14,11 @@ deploy: app-hdfs-path: jfs://ns1/apps/datalake/hudi/jars archive-hdfs-path: jfs://ns1/apps/datalake/hudi/archive victoria-push-url: http://132.121.126.84:35710/api/v1/import/prometheus + executor: + staging-path: hdfs://b2/apps/datalake/yarn + archive-hdfs-path: hdfs://b2/apps/flink/completed-jobs/ + task-jar-path: hdfs://b2/apps/datalake/jars/service/service-executor-task-1.0.0-SNAPSHOT.jar + task-result-path: hdfs://b2/apps/datalake/task-results security: authority: ENC(GXKnbq1LS11U2HaONspvH+D/TkIx13aWTaokdkzaF7HSvq6Z0Rv1+JUWFnYopVXu) username: ENC(moIO5mO39V1Z+RDwROK9JXY4GfM8ZjDgM6Si7wRZ1MPVjbhTpmLz3lz28rAiw7c2LeCmizfJzHkEXIwGlB280g==) diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index 484e637..a98a303 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -135,3 +135,11 @@ deploy: - name: service-executor-manager source-jar: service-executor-manager-1.0.0-SNAPSHOT.jar replicas: 1 + environments: + connector_hadoop_kerberos-principal: ${deploy.runtime.user}/$\{hostname}.hdp.dc@ECLD.COM + connector_hadoop_kerberos-keytab-path: ${deploy.runtime.kerberos-keytab-path} + arguments: + executor_staging-directory: ${deploy.runtime.executor.staging-path} + executor_history-server-archive-dir: ${deploy.runtime.executor.archive-hdfs-path} + executor_task-jar-path: ${deploy.runtime.executor.task-jar-path} + executor_task-result-path: ${deploy.runtime.executor.task-result-path} diff --git a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java index adf8c9c..b1a88dc 100644 --- a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java +++ b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskConstants.java @@ -7,6 +7,10 @@ package com.lanyuanxiaoyao.service.executor.core; * @date 2023-12-04 */ public interface TaskConstants { + String TASK_ID = "task-id"; + String TASK_ID_OPTION = "-" + TASK_ID; + String TASK_RESULT_PATH = "task-result-path"; + String TASK_RESULT_PATH_OPTION = "-" + TASK_RESULT_PATH; String TASK_CONTEXT = "task-context"; String TASK_CONTEXT_OPTION = "-" + TASK_CONTEXT; } diff --git a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java index 6a6be07..1283e1e 100644 --- a/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java +++ b/service-executor/service-executor-core/src/main/java/com/lanyuanxiaoyao/service/executor/core/TaskContext.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.executor.core; import java.io.Serializable; +import java.util.HashMap; import java.util.Map; /** @@ -10,15 +11,31 @@ import java.util.Map; * @date 2023-12-04 */ public class TaskContext implements Serializable { + private String taskId; + private String resultPath; private Map metadata; public TaskContext() { } - public TaskContext(Map metadata) { + public TaskContext(String taskId, String resultPath) { + this(taskId, resultPath, new HashMap<>()); + } + + public TaskContext(String taskId, String resultPath, Map metadata) { + this.taskId = taskId; + this.resultPath = resultPath; this.metadata = metadata; } + public String getTaskId() { + return taskId; + } + + public String getResultPath() { + return resultPath; + } + public Map getMetadata() { return metadata; } @@ -26,7 +43,9 @@ public class TaskContext implements Serializable { @Override public String toString() { return "TaskContext{" + - "metadata=" + metadata + - '}'; + "taskId='" + taskId + '\'' + + ", resultPath='" + resultPath + '\'' + + ", metadata=" + metadata + + '}'; } } diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java index de1918e..231c959 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/ExecutorManagerApplication.java @@ -1,7 +1,12 @@ package com.lanyuanxiaoyao.service.executor.manager; +import cn.hutool.core.util.IdUtil; import com.eshore.odcp.hudi.connector.utils.executor.Runner; +import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.executor.core.TaskConstants; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.manager.configuration.ExecutorConfiguration; +import com.lanyuanxiaoyao.service.executor.manager.configuration.HadoopConfiguration; import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; import java.time.Duration; import java.util.ArrayList; @@ -11,6 +16,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnDeploymentTarget; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.eclipse.collections.api.factory.Maps; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; @@ -19,6 +25,7 @@ import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.retry.annotation.EnableRetry; import static com.eshore.odcp.hudi.connector.Constants.HALF_HOUR; @@ -40,12 +47,24 @@ import static com.eshore.odcp.hudi.connector.Constants.MINUTE; @EnableEncryptableProperties @EnableRetry public class ExecutorManagerApplication implements ApplicationRunner { + private final HadoopConfiguration hadoopConfiguration; + private final ExecutorConfiguration executorConfiguration; + private final ObjectMapper mapper; + + public ExecutorManagerApplication(HadoopConfiguration hadoopConfiguration, ExecutorConfiguration executorConfiguration, Jackson2ObjectMapperBuilder builder) { + this.hadoopConfiguration = hadoopConfiguration; + this.executorConfiguration = executorConfiguration; + this.mapper = builder.build(); + } + public static void main(String[] args) { SpringApplication.run(ExecutorManagerApplication.class, args); } @Override public void run(ApplicationArguments args) throws Exception { + String taskId = IdUtil.nanoId(8); + Configuration configuration = new Configuration(); configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true); configuration.setString(AkkaOptions.ASK_TIMEOUT, "10 min"); @@ -54,15 +73,15 @@ public class ExecutorManagerApplication implements ApplicationRunner { configuration.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(30)); // Kerberos认证 configuration.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, true); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, "/etc/security/keytabs/datalake.app.keytab"); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "datalake/b5s119.hdp.dc@ECLD.COM"); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, hadoopConfiguration.getKerberosKeytabPath()); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, hadoopConfiguration.getKerberosPrincipal()); configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, MINUTE); configuration.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, HALF_HOUR); configuration.setString(AkkaOptions.ASK_TIMEOUT, "1 min"); configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min"); configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4"); - configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, "hdfs://b2/apps/datalake/yarn"); + configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, executorConfiguration.getStagingDirectory()); configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1"); configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1"); configuration.setInteger(RestOptions.PORT, 8081); @@ -74,22 +93,28 @@ public class ExecutorManagerApplication implements ApplicationRunner { configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m")); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m")); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10); - configuration.setString(YarnConfigOptions.APPLICATION_NAME, "HudiService_faee2e95-660d-4b1c-9cec-13473b3cd5b7"); - configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, "hdfs://b2/apps/flink/completed-jobs/"); + configuration.setString(YarnConfigOptions.APPLICATION_NAME, "Service_Task_" + taskId); + configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir()); configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000); configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs"); configuration.set(PipelineOptions.JARS, new ArrayList() {{ - add("hdfs://b2/apps/datalake/jars/service/service-executor-task.jar"); + add(executorConfiguration.getTaskJarPath()); }}); ApplicationId applicationId = Runner.run( configuration, - "com.lanyuanxiaoyao.service.executor.task.Hello", + "com.lanyuanxiaoyao.service.executor.task.AvroScanner", new String[]{ TaskConstants.TASK_CONTEXT_OPTION, - "{\"metadata\": {\"id\":\"faee2e95-660d-4b1c-9cec-13473b3cd5b7\"}}" + mapper.writeValueAsString( + new TaskContext( + taskId, + executorConfiguration.getTaskResultPath(), + Maps.mutable.of("key", "123456", "hdfs", "hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr") + ) + ) } ); System.out.println(applicationId); diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/ExecutorConfiguration.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/ExecutorConfiguration.java new file mode 100644 index 0000000..31e65c0 --- /dev/null +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/ExecutorConfiguration.java @@ -0,0 +1,66 @@ +package com.lanyuanxiaoyao.service.executor.manager.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * yarn 配置 + * + * @author ZhangJiacheng + * @date 2022-03-30 + */ +@ConfigurationProperties("executor") +@Component +public class ExecutorConfiguration { + private static final Logger logger = LoggerFactory.getLogger(ExecutorConfiguration.class); + + private String stagingDirectory; + private String historyServerArchiveDir; + private String taskJarPath; + private String taskResultPath; + + public String getStagingDirectory() { + return stagingDirectory; + } + + public void setStagingDirectory(String stagingDirectory) { + this.stagingDirectory = stagingDirectory; + } + + public String getHistoryServerArchiveDir() { + return historyServerArchiveDir; + } + + public void setHistoryServerArchiveDir(String historyServerArchiveDir) { + this.historyServerArchiveDir = historyServerArchiveDir; + } + + public String getTaskJarPath() { + return taskJarPath; + } + + public void setTaskJarPath(String taskJarPath) { + this.taskJarPath = taskJarPath; + } + + public String getTaskResultPath() { + return taskResultPath; + } + + public void setTaskResultPath(String taskResultPath) { + this.taskResultPath = taskResultPath; + } + + @Override + public String toString() { + return "ExecutorConfiguration{" + + "stagingDirectory='" + stagingDirectory + '\'' + + ", historyServerArchiveDir='" + historyServerArchiveDir + '\'' + + ", taskJarPath='" + taskJarPath + '\'' + + ", taskResultPath='" + taskResultPath + '\'' + + '}'; + } +} diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/HadoopConfiguration.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/HadoopConfiguration.java new file mode 100644 index 0000000..fc20303 --- /dev/null +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/configuration/HadoopConfiguration.java @@ -0,0 +1,51 @@ +package com.lanyuanxiaoyao.service.executor.manager.configuration; + +import javax.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * yarn 配置 + * + * @author ZhangJiacheng + * @date 2022-03-30 + */ +@ConfigurationProperties("connector.hadoop") +@Component +public class HadoopConfiguration { + private static final Logger logger = LoggerFactory.getLogger(HadoopConfiguration.class); + + private String kerberosPrincipal; + private String kerberosKeytabPath; + + @PostConstruct + private void init() { + logger.info("Configuration initial: {}", this); + } + + public String getKerberosPrincipal() { + return kerberosPrincipal; + } + + public void setKerberosPrincipal(String kerberosPrincipal) { + this.kerberosPrincipal = kerberosPrincipal; + } + + public String getKerberosKeytabPath() { + return kerberosKeytabPath; + } + + public void setKerberosKeytabPath(String kerberosKeytabPath) { + this.kerberosKeytabPath = kerberosKeytabPath; + } + + @Override + public String toString() { + return "HadoopConfiguration{" + + "kerberosPrincipal='" + kerberosPrincipal + '\'' + + ", kerberosKeytabPath='" + kerberosKeytabPath + '\'' + + '}'; + } +} diff --git a/service-executor/service-executor-manager/src/main/resources/application.yml b/service-executor/service-executor-manager/src/main/resources/application.yml index bd00df3..4518441 100644 --- a/service-executor/service-executor-manager/src/main/resources/application.yml +++ b/service-executor/service-executor-manager/src/main/resources/application.yml @@ -2,4 +2,9 @@ spring: application: name: service-executor-manager profiles: - include: random-port,common,discovery,metrics \ No newline at end of file + include: random-port,common,discovery,metrics +executor: + staging-directory: hdfs://b2/apps/datalake/yarn + history-server-archive-dir: hdfs://b2/apps/flink/completed-jobs/ + task-jar-path: hdfs://b2/apps/datalake/jars/service/service-executor-task-1.0.0-SNAPSHOT.jar + task-result-path: hdfs://b2/apps/datalake/task-results \ No newline at end of file diff --git a/service-executor/service-executor-task/pom.xml b/service-executor/service-executor-task/pom.xml index a2c946c..fda909c 100644 --- a/service-executor/service-executor-task/pom.xml +++ b/service-executor/service-executor-task/pom.xml @@ -65,6 +65,17 @@ 30.1.1-jre-15.0 provided + + org.apache.hadoop + hadoop-client + 3.1.2 + + + org.eclipse.collections + eclipse-collections-api + 10.4.0 + provided + diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java new file mode 100644 index 0000000..132a6b0 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/AvroScanner.java @@ -0,0 +1,56 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import com.lanyuanxiaoyao.service.executor.task.functions.ReadLogFile; +import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; +import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; +import java.util.Map; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * avro 日志扫描 + * + * @author lanyuanxiaoyao + * @date 2024-01-08 + */ +public class AvroScanner { + private static final Logger logger = LoggerFactory.getLogger(AvroScanner.class); + + public static void main(String[] args) throws Exception { + TaskContext taskContext = ArgumentsHelper.getContext(args); + logger.info("Context: {}", taskContext); + + Map metadata = taskContext.getMetadata(); + ArgumentsHelper.checkMetadata(taskContext, "hdfs"); + String hdfs = (String) metadata.get("hdfs"); + ArgumentsHelper.checkMetadata(taskContext, "key"); + String key = (String) metadata.get("key"); + + Configuration configuration = new Configuration(); + FileSystem fileSystem = FileSystem.get(configuration); + if (!fileSystem.exists(new Path(hdfs))) { + throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs)); + } + + ImmutableList paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs))) + .reject(status -> StrUtil.equals(".hoodie", status.getPath().getName())) + .collect(status -> status.getPath().toString()); + + StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment(); + environment.setParallelism(20); + FlinkHelper.getAllLogFilePaths(environment.fromCollection(paths.toList())) + .flatMap(new ReadLogFile()) + .map(RecordView::toString) + .sinkTo(FlinkHelper.createFileSink(taskContext)); + environment.execute(StrUtil.format("Search {} in {}", key, hdfs)); + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java deleted file mode 100644 index 4423698..0000000 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/Hello.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.lanyuanxiaoyao.service.executor.task; - -import com.lanyuanxiaoyao.service.executor.core.TaskContext; -import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Hello world - * - * @author lanyuanxiaoyao - * @date 2023-12-04 - */ -public class Hello { - private static final Logger logger = LoggerFactory.getLogger(Hello.class); - - public static void main(String[] args) throws Exception { - TaskContext taskContext = ArgumentsHelper.getContext(args); - logger.info("Context: {}", taskContext); - - StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); - - environment - .addSource(new SourceFunction() { - @Override - public void run(SourceContext context) { - for (int index = 0; index < 10; index++) { - context.collect(index); - } - } - - @Override - public void cancel() { - } - }) - .map(value -> "Index: " + value) - .addSink(new SinkFunction() { - @Override - public void invoke(String value, Context context) throws Exception { - logger.info("Value: {}", value); - } - }); - - - environment.execute("Service task: Hello"); - } -} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java new file mode 100644 index 0000000..f3dc475 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/entity/RecordView.java @@ -0,0 +1,65 @@ +package com.lanyuanxiaoyao.service.executor.task.entity; + +import cn.hutool.core.util.StrUtil; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * 记录 + * + * @author lanyuanxiaoyao + * @date 2024-01-09 + */ +public class RecordView implements Serializable, Comparable { + private final Operation operation; + private final String data; + private final String timestamp; + private final String file; + private final Map attributes; + + public RecordView(Operation operation, String data, String timestamp, String file) { + this.operation = operation; + this.data = data; + this.timestamp = timestamp; + this.file = file; + this.attributes = new HashMap<>(); + } + + public Operation getOperation() { + return operation; + } + + public String getData() { + return data; + } + + public String getTimestamp() { + return timestamp; + } + + public String getFile() { + return file; + } + + public Map getAttributes() { + return attributes; + } + + @Override + public String toString() { + return StrUtil.format("\n{} {} {}\n{}", operation, timestamp, file, data); + } + + @Override + public int compareTo(RecordView o) { + if (o != null) { + return this.timestamp.compareTo(o.timestamp); + } + return 0; + } + + public enum Operation { + DELETE, UPSERT, ROLLBACK, RESULT, SOURCE + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java new file mode 100644 index 0000000..fdfd518 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadLogFile.java @@ -0,0 +1,112 @@ +package com.lanyuanxiaoyao.service.executor.task.functions; + +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.*; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.org.apache.avro.util.Utf8; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; + +/** + * 读取log文件 + * + * @author lanyuanxiaoyao + * @date 2024-01-09 + */ +public class ReadLogFile implements FlatMapFunction { + private RecordView parseData(String source, IndexedRecord record) { + Schema schema = record.getSchema(); + StringBuilder builder = new StringBuilder(); + for (Schema.Field field : schema.getFields()) { + builder.append(field.name()) + .append("=") + .append(record.get(field.pos())) + .append(" "); + } + String timestamp = null; + Schema.Field commitTimeField = schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + if (ObjectUtil.isNotNull(commitTimeField)) { + timestamp = ((Utf8) record.get(commitTimeField.pos())).toString(); + } + String latestOpTs = null; + Schema.Field latestOpTsField = schema.getField(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME); + if (ObjectUtil.isNotNull(latestOpTsField)) { + latestOpTs = ((Utf8) record.get(latestOpTsField.pos())).toString(); + } + + String data = builder.toString(); + RecordView recordView = new RecordView(RecordView.Operation.UPSERT, data, timestamp, source); + recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs); + + return recordView; + } + + @Override + public void flatMap(String logFilePath, Collector out) throws IOException { + Configuration readerConfiguration = new Configuration(); + FileSystem readerFilesystem = FileSystem.get(readerConfiguration); + MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, new Path(logFilePath)); + Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType)); + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(new Path(logFilePath)), schema)) { + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + Map logBlockHeader = block.getLogBlockHeader(); + String instant = logBlockHeader.getOrDefault(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, null); + switch (block.getBlockType()) { + case AVRO_DATA_BLOCK: + HoodieAvroDataBlock avroDataBlock = (HoodieAvroDataBlock) block; + try (ClosableIterator avroDataBlockRecordIterator = avroDataBlock.getRecordIterator()) { + while (avroDataBlockRecordIterator.hasNext()) { + RecordView recordView = parseData(logFilePath, avroDataBlockRecordIterator.next()); + out.collect(recordView); + } + } + break; + case PARQUET_DATA_BLOCK: + HoodieParquetDataBlock parquetDataBlock = (HoodieParquetDataBlock) block; + try (ClosableIterator parquetDataBlockRecordIterator = parquetDataBlock.getRecordIterator()) { + while (parquetDataBlockRecordIterator.hasNext()) { + RecordView recordView = parseData(logFilePath, parquetDataBlockRecordIterator.next()); + out.collect(recordView); + } + } + break; + case CORRUPT_BLOCK: + break; + case DELETE_BLOCK: + HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) block; + String keys = Arrays.stream(deleteBlock.getRecordsToDelete()) + .map(deleteRecord -> deleteRecord.getHoodieKey().toString()) + .collect(Collectors.joining(" ")); + out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath)); + break; + case COMMAND_BLOCK: + HoodieCommandBlock commandBlock = (HoodieCommandBlock) block; + Map header = commandBlock.getLogBlockHeader(); + out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath)); + break; + default: + break; + } + } + } + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java index 6505ea9..3c5527c 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/ArgumentsHelper.java @@ -1,8 +1,10 @@ package com.lanyuanxiaoyao.service.executor.task.helper; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.executor.core.TaskConstants; import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import java.util.Map; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -20,4 +22,14 @@ public class ArgumentsHelper { } return JacksonHelper.getMapper().readValue(argsTool.get(TaskConstants.TASK_CONTEXT), TaskContext.class); } + + public static void checkMetadata(TaskContext context, String key) { + Map metadata = context.getMetadata(); + if (ObjectUtil.isEmpty(metadata)) { + throw new RuntimeException("Metadata is empty"); + } + if (!metadata.containsKey(key)) { + throw new RuntimeException(key + " argument is not found"); + } + } } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java new file mode 100644 index 0000000..532a642 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/helper/FlinkHelper.java @@ -0,0 +1,73 @@ +package com.lanyuanxiaoyao.service.executor.task.helper; + +import com.lanyuanxiaoyao.service.executor.core.TaskContext; +import java.util.Arrays; +import java.util.Iterator; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.java.functions.FlatMapIterator; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.fs.FSUtils; + +/** + * @author lanyuanxiaoyao + * @date 2024-01-08 + */ +public class FlinkHelper { + public static StreamExecutionEnvironment getSteamEnvironment() { + return StreamExecutionEnvironment.getExecutionEnvironment(); + } + + public static StreamExecutionEnvironment getBatchEnvironment() { + StreamExecutionEnvironment environment = getSteamEnvironment(); + environment.setRuntimeMode(RuntimeExecutionMode.BATCH); + return environment; + } + + public static FileSink createFileSink(TaskContext context) { + return createFileSink(context.getTaskId(), context.getResultPath()); + } + + public static FileSink createFileSink(String taskId, String resultPath) { + return FileSink + .forRowFormat(new Path(resultPath + "/" + taskId), new SimpleStringEncoder<>("UTF-8")) + .withBucketAssigner(new BasePathBucketAssigner<>()) + .withOutputFileConfig(new OutputFileConfig("task", "")) + .build(); + } + + public static DataStream getAllFilePaths(DataStream source) { + return source + .map(path -> { + Configuration configuration = new Configuration(); + FileSystem fileSystem = FileSystem.get(configuration); + FileStatus[] statuses = fileSystem.listStatus(new org.apache.hadoop.fs.Path(path)); + String[] results = new String[statuses.length]; + for (int index = 0; index < statuses.length; index++) { + results[index] = statuses[index].getPath().toString(); + } + return results; + }) + .name("Read files") + .flatMap(new FlatMapIterator() { + @Override + public Iterator flatMap(String[] strings) { + return Arrays.asList(strings).iterator(); + } + }); + } + + public static DataStream getAllLogFilePaths(DataStream source) { + return getAllFilePaths(source) + .filter(FSUtils::isLogFile) + .name("Filter log files"); + } +}