feat(command): 增加升降级hoodie.properties配置文件的操作

This commit is contained in:
v-zhangjc9
2024-05-10 09:32:23 +08:00
parent dad624037b
commit d457b5d2f6

View File

@@ -0,0 +1,80 @@
package com.lanyuanxiaoyao.service.command.commands;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.forest.service.HudiService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
/**
* 工具
*
* @author lanyuanxiaoyao
* @date 2024-05-08
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ShellComponent("额外操作")
public class ToolCommand {
private static final Logger logger = LoggerFactory.getLogger(ToolCommand.class);
private final InfoService infoService;
private final HudiService hudiService;
public ToolCommand(InfoService infoService, HudiService hudiService) {
this.infoService = infoService;
this.hudiService = hudiService;
}
@ShellMethod("升级hoodie.properties")
public void upgradeHoodieProperties() {
MutableList<String> results = Lists.mutable.empty();
infoService.tableMetaList()
.asParallel(ExecutorProvider.EXECUTORS, 10)
.forEach(meta -> {
String hdfs = meta.getHudi().getTargetHdfsPath();
if (hudiService.existsPath(hdfs)) {
String propertiesPath = StrUtil.format("{}/.hoodie/hoodie.properties", hdfs);
String content = hudiService.read(propertiesPath);
if (content.contains("org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") && content.contains("org.apache.hudi.keygen.SimpleKeyGenerator")) {
content = content.replace("org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", "org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload");
content = content.replace("org.apache.hudi.keygen.SimpleKeyGenerator", "org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator");
hudiService.write(propertiesPath, content, true);
logger.info("{} has rewrote", hdfs);
results.add(StrUtil.format("{} {}", meta.getJob().getId(), meta.getAlias()));
}
} else {
logger.warn("{} not found", hdfs);
}
});
logger.info("Result:\n{}", results.makeString("\n"));
}
@ShellMethod("降级hoodie.properties")
public void downgradeHoodieProperties() {
MutableList<String> results = Lists.mutable.empty();
infoService.tableMetaList()
.asParallel(ExecutorProvider.EXECUTORS, 10)
.forEach(meta -> {
String hdfs = meta.getHudi().getTargetHdfsPath();
if (hudiService.existsPath(hdfs)) {
String propertiesPath = StrUtil.format("{}/.hoodie/hoodie.properties", hdfs);
String content = hudiService.read(propertiesPath);
if (content.contains("org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload") && content.contains("org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator")) {
content = content.replace("org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload", "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload");
content = content.replace("org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator", "org.apache.hudi.keygen.SimpleKeyGenerator");
hudiService.write(propertiesPath, content, true);
logger.info("{} has rewrote", hdfs);
results.add(StrUtil.format("{} {}", meta.getJob().getId(), meta.getAlias()));
}
} else {
logger.warn("{} not found", hdfs);
}
});
logger.info("Result:\n{}", results.makeString("\n"));
}
}