feat(executor-manager): 优化task任务命名

This commit is contained in:
2024-02-05 10:53:36 +08:00
parent d155779952
commit efcb841434
5 changed files with 24 additions and 59 deletions

View File

@@ -97,7 +97,7 @@ public class ExecutorTaskService {
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("0m"));
configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("128m"));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 10);
configuration.setString(YarnConfigOptions.APPLICATION_NAME, "Service_Task_" + name + "_" + taskId);
configuration.setString(YarnConfigOptions.APPLICATION_NAME, StrUtil.format("Service_Task {} #{}", name, taskId));
configuration.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, executorConfiguration.getHistoryServerArchiveDir());
configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
@@ -124,7 +124,7 @@ public class ExecutorTaskService {
Boolean scanTarget
) throws Exception {
String taskId = taskId();
Configuration configuration = generateConfiguration(taskId, "scan");
Configuration configuration = generateConfiguration(taskId, "scan " + key);
MapBuilder<String, Object> builder = MapUtil.builder();
setEnvironment(configuration, "key", key);
@@ -163,7 +163,7 @@ public class ExecutorTaskService {
public String scanLatestOpTs(String hdfs) throws Exception {
String taskId = taskId();
Configuration configuration = generateConfiguration(taskId, "latest_op_ts");
Configuration configuration = generateConfiguration(taskId, StrUtil.format("latest_op_ts {}", hdfs));
configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("1024m"));
MapBuilder<String, Object> builder = MapUtil.builder();

View File

@@ -1,51 +0,0 @@
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
# Uncomment this if you want to _only_ change Flink's logging
logger.flink.name = org.apache.flink
logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO
logger.hudi.name=org.apache.hudi
logger.hudi.level=INFO
# Log all infos in the given file
#appender.main.name = MainAppender
#appender.main.type = Console
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
logger.hadoop.configuration.name=org.apache.hadoop.conf.Configuration
logger.hadoop.configuration.level=ERROR