diff --git a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java index 5677437..455716e 100644 --- a/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java +++ b/service-executor/service-executor-manager/src/main/java/com/lanyuanxiaoyao/service/executor/manager/service/TaskService.java @@ -59,6 +59,11 @@ public class TaskService { return IdUtil.nanoId(8); } + private void setEnvironment(Configuration configuration, String key, String value) { + configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + key, value); + configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + key, value); + } + private Configuration generateConfiguration(String taskId, String name) { Configuration configuration = new Configuration(); configuration.setBoolean(JobManagerOptions.JVM_DIRECT_MEMORY_LIMIT_ENABLED, true); @@ -92,6 +97,8 @@ public class TaskService { configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir()); configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000); + setEnvironment(configuration, "task_id", taskId); + configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs"); @@ -103,8 +110,11 @@ public class TaskService { public String scanAvro(String hdfs, String key, Boolean scanLog, Boolean scanData, Boolean scanSource, Boolean scanTarget) throws Exception { String taskId = taskId(); + Configuration configuration = generateConfiguration(taskId, "scan"); + setEnvironment(configuration, "hdfs", hdfs); + setEnvironment(configuration, "key", key); ApplicationId applicationId = Runner.run( - generateConfiguration(taskId, "scan"), + configuration, "com.lanyuanxiaoyao.service.executor.task.DataScanner", new String[]{ TaskConstants.TASK_CONTEXT_OPTION,