feat(all): flink job增加tags属性

flink job级别增加标签属性,用于区分调用测试包和非测试包
This commit is contained in:
v-zhangjc9
2024-07-30 16:42:02 +08:00
parent b0c5d04476
commit a3472340b5
23 changed files with 332 additions and 219 deletions

View File

@@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.common.utils.TagsHelper;
import com.lanyuanxiaoyao.service.configuration.HudiServiceProperties;
import com.lanyuanxiaoyao.service.executor.Runner;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
@@ -23,7 +24,18 @@ import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -137,9 +149,13 @@ public class ExecutorService {
);
}
private String getLatestExecutorJarPath() throws IOException {
private String getLatestExecutorJarPath(FlinkJob flinkJob) throws IOException {
try (FileSystem fileSystem = HadoopUtil.createFileSystem(HadoopUtil.createConfiguration(hadoopConfiguration))) {
Path root = new Path(hudiConfiguration.getAppHdfsPath());
if (TagsHelper.existsTag(flinkJob, Constants.TAGS_USE_TEST_JAR)) {
logger.warn("Use test jar for {}", flinkJob.getId());
root = new Path(hudiConfiguration.getAppTestHdfsPath());
}
return Lists.immutable.of(fileSystem.listStatus(root))
.select(FileStatus::isFile)
.collect(FileStatus::getPath)
@@ -263,7 +279,7 @@ public class ExecutorService {
// configuration.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 10000);
// 业务jar包
String executorJarPath = getLatestExecutorJarPath();
String executorJarPath = getLatestExecutorJarPath(flinkJob);
logger.info("Executor jar path: {}", executorJarPath);
Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{
@@ -314,7 +330,7 @@ public class ExecutorService {
configuration.setString(YarnConfigOptions.APPLICATION_NAME, NameHelper.compactionJobName(flinkJob.getId(), tableMeta.getAlias()));
String executorJarPath = getLatestExecutorJarPath();
String executorJarPath = getLatestExecutorJarPath(flinkJob);
logger.info("Executor jar path: {}", executorJarPath);
Long executorJarVersion = getLatestExecutorJarVersion(executorJarPath);
configuration.set(PipelineOptions.JARS, new ArrayList<String>() {{

View File

@@ -10,7 +10,7 @@ import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.entity.compaction.ScheduleJob;
import com.lanyuanxiaoyao.service.common.utils.LogHelper;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.common.utils.TagsHelper;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiCompactionPlan;
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant;
import com.lanyuanxiaoyao.service.configuration.entity.queue.QueueItem;
@@ -30,7 +30,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -218,7 +217,7 @@ public class CompactionService {
TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias);
compactionJobGetTableInfoCost.record(Duration.between(getTableInfoStartTime, Instant.now()));
if (TableMetaHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
if (TagsHelper.existsTag(meta, Constants.TAGS_NO_COMPACT)) {
logger.warn("[{}] [{}] Table tags no compact", flinkJob.getId(), meta.getAlias());
return;
}

View File

@@ -18,6 +18,7 @@ public class HudiConfiguration {
private static final Logger logger = LoggerFactory.getLogger(HudiConfiguration.class);
private String appHdfsPath;
private String appTestHdfsPath;
private String victoriaPushUrl;
private String lokiPushUrl;
@@ -34,6 +35,14 @@ public class HudiConfiguration {
this.appHdfsPath = appHdfsPath;
}
public String getAppTestHdfsPath() {
return appTestHdfsPath;
}
public void setAppTestHdfsPath(String appTestHdfsPath) {
this.appTestHdfsPath = appTestHdfsPath;
}
public String getVictoriaPushUrl() {
return victoriaPushUrl;
}
@@ -54,6 +63,7 @@ public class HudiConfiguration {
public String toString() {
return "HudiConfiguration{" +
"appHdfsPath='" + appHdfsPath + '\'' +
", appTestHdfsPath='" + appTestHdfsPath + '\'' +
", victoriaPushUrl='" + victoriaPushUrl + '\'' +
", lokiPushUrl='" + lokiPushUrl + '\'' +
'}';