refactor(cli): 优化配置
移除flink多余配置
This commit is contained in:
@@ -17,27 +17,9 @@ import org.springframework.stereotype.Component;
|
||||
public class ExecutorConfiguration {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfiguration.class);
|
||||
|
||||
private String stagingDirectory;
|
||||
private String historyServerArchiveDir;
|
||||
private String taskJarPath;
|
||||
private String taskResultPath;
|
||||
|
||||
public String getStagingDirectory() {
|
||||
return stagingDirectory;
|
||||
}
|
||||
|
||||
public void setStagingDirectory(String stagingDirectory) {
|
||||
this.stagingDirectory = stagingDirectory;
|
||||
}
|
||||
|
||||
public String getHistoryServerArchiveDir() {
|
||||
return historyServerArchiveDir;
|
||||
}
|
||||
|
||||
public void setHistoryServerArchiveDir(String historyServerArchiveDir) {
|
||||
this.historyServerArchiveDir = historyServerArchiveDir;
|
||||
}
|
||||
|
||||
public String getTaskJarPath() {
|
||||
return taskJarPath;
|
||||
}
|
||||
@@ -57,9 +39,7 @@ public class ExecutorConfiguration {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ExecutorConfiguration{" +
|
||||
"stagingDirectory='" + stagingDirectory + '\'' +
|
||||
", historyServerArchiveDir='" + historyServerArchiveDir + '\'' +
|
||||
", taskJarPath='" + taskJarPath + '\'' +
|
||||
"taskJarPath='" + taskJarPath + '\'' +
|
||||
", taskResultPath='" + taskResultPath + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@@ -85,7 +85,6 @@ public class ExecutorTaskService {
|
||||
configuration.setString(AkkaOptions.TCP_TIMEOUT, "2 min");
|
||||
configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
|
||||
configuration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "4");
|
||||
configuration.setString(YarnConfigOptions.STAGING_DIRECTORY, executorConfiguration.getStagingDirectory());
|
||||
configuration.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
|
||||
configuration.setString(ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "MALLOC_ARENA_MAX", "1");
|
||||
configuration.setInteger(RestOptions.PORT, 8081);
|
||||
@@ -98,14 +97,9 @@ public class ExecutorTaskService {
|
||||
configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
|
||||
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
|
||||
configuration.setString(YarnConfigOptions.APPLICATION_NAME, StrUtil.format("Service_Task {} #{}", name, taskId));
|
||||
configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir());
|
||||
configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
|
||||
|
||||
setEnvironment(configuration, "task_id", taskId);
|
||||
|
||||
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
configuration.setString(YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, "/app-logs");
|
||||
|
||||
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{
|
||||
add(executorConfiguration.getTaskJarPath());
|
||||
}});
|
||||
|
||||
Reference in New Issue
Block a user