feat(executor-task): 增加查询指定hudi表base文件最新的timestamp

根据LATEST_OP_TS来判断比较timestamp先后,排序后取最后的
This commit is contained in:
2024-01-30 12:31:57 +08:00
parent 4b2585984c
commit cd3b340270
13 changed files with 377 additions and 166 deletions

View File

@@ -2,25 +2,19 @@ package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import java.io.IOException;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,36 +28,6 @@ import org.slf4j.LoggerFactory;
public class DataScanner {
private static final Logger logger = LoggerFactory.getLogger(DataScanner.class);
private static ImmutableList<String> parsePaths(FileSystem fileSystem, ImmutableList<Path> paths) {
return paths
.asParallel(ExecutorProvider.EXECUTORS, 1)
.reject(path -> {
try {
return FSUtils.getFileSize(fileSystem, path) < 1;
} catch (IOException e) {
logger.error("Get file size error", e);
}
return true;
})
.groupBy(FSUtils::getFileIdFromFilePath)
.multiValuesView()
.collect(pathList -> pathList
.toSortedListBy(path -> {
String commitTime = FSUtils.getCommitTime(path.getName());
try {
return Long.valueOf(commitTime);
} catch (Throwable throwable) {
return 0L;
}
})
.getLastOptional())
.select(Optional::isPresent)
.collect(Optional::get)
.collect(Path::toString)
.toList()
.toImmutable();
}
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
@@ -102,30 +66,13 @@ public class DataScanner {
if (scanLog || scanBase) {
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) metadata.get("hdfs");
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
if (!fileSystem.exists(new Path(hdfs))) {
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs));
}
FileSystem fileSystem = FileSystem.get(new Configuration());
HdfsHelper.checkHdfsPath(fileSystem, hdfs);
ImmutableList<Path> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
.flatCollect(status -> {
try {
if (status.isDirectory()) {
return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
} else {
return Lists.immutable.of(status);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(FileStatus::getPath);
if (scanLog) {
logger.info("Scan log hdfs: {}", hdfs);
ImmutableList<String> logPaths = paths.select(FSUtils::isLogFile).collect(Path::toString);
int parallelism = Math.max(1, Math.min(logPaths.size() / 20, 100));
ImmutableList<String> logPaths = HdfsHelper.logPaths(fileSystem, hdfs);
int parallelism = HdfsHelper.logScanParallelismPredict(logPaths);
totalParallelism = Math.max(totalParallelism, parallelism);
DataStream<RecordView> stream = environment
.fromCollection(logPaths.toList())
@@ -141,11 +88,11 @@ public class DataScanner {
}
if (scanBase) {
logger.info("Scan base hdfs: {}", hdfs);
ImmutableList<String> dataPaths = parsePaths(fileSystem, paths.select(FSUtils::isBaseFile));
int parallelism = Math.max(1, Math.min(dataPaths.size() / 2, 500));
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs);
int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);
totalParallelism = Math.max(totalParallelism, parallelism);
DataStream<RecordView> stream = environment
.fromCollection(dataPaths.toList())
.fromCollection(basePaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")

View File

@@ -0,0 +1,76 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.functions.ReadHudiFile;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.HdfsHelper;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 查找base文件最后的opts
*
* @author lanyuanxiaoyao
* @date 2024-01-22
*/
public class LatestOperationTimeScan {
private static final Logger logger = LoggerFactory.getLogger(LatestOperationTimeScan.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
String hdfs = (String) taskContext.getMetadata().get("hdfs");
FileSystem fileSystem = FileSystem.get(new Configuration());
HdfsHelper.checkHdfsPath(fileSystem, hdfs);
ImmutableList<String> basePaths = HdfsHelper.latestBasePaths(fileSystem, hdfs);
int parallelism = HdfsHelper.baseScanParallelismPredict(basePaths);
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(parallelism);
String maxValue = "0";
try (CloseableIterator<String> iterator = environment
.fromCollection(basePaths.toList())
.name("Read base paths")
.flatMap(new ReadHudiFile())
.name("Read hudi file")
.setParallelism(parallelism)
.map(view -> (String) view.getAttributes().getOrDefault(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "0"))
.map(time -> new Tuple2<>(RandomUtil.randomInt(parallelism), time), TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {
}))
.keyBy(tuple -> tuple.f0)
.reduce(new RichReduceFunction<Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1, Tuple2<Integer, String> value2) throws Exception {
return value1.f1.compareTo(value2.f1) > 0 ? value1 : value2;
}
})
.map(tuple -> tuple.f1)
/*.sinkTo(FlinkHelper.createFileSink(taskContext))*/
.executeAndCollect("Find latest opts")) {
while (iterator.hasNext()) {
String item = iterator.next();
if (item.compareTo(maxValue) > 0) {
maxValue = item;
}
}
}
HdfsHelper.createResult(fileSystem, taskContext, StrUtil.trim(maxValue));
}
}

View File

@@ -1,6 +1,7 @@
package com.lanyuanxiaoyao.service.executor.task.functions;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import java.io.IOException;
@@ -60,7 +61,9 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
String data = builder.toString();
RecordView recordView = new RecordView(operation, data, timestamp, source);
recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs);
if (StrUtil.isNotBlank(latestOpTs)) {
recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs);
}
return recordView;
}

View File

@@ -28,7 +28,7 @@ public class FlinkHelper {
public static StreamExecutionEnvironment getBatchEnvironment() {
StreamExecutionEnvironment environment = getSteamEnvironment();
environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
return environment;
}

View File

@@ -0,0 +1,131 @@
package com.lanyuanxiaoyao.service.executor.task.helper;
import cn.hutool.core.collection.IterUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HDFS工具
*
* @author lanyuanxiaoyao
* @date 2024-01-22
*/
public class HdfsHelper {
private static final Logger logger = LoggerFactory.getLogger(HdfsHelper.class);
public static void checkHdfsPath(FileSystem fileSystem, String path) throws IOException {
checkHdfsPath(fileSystem, new Path(path));
}
public static void checkHdfsPath(FileSystem fileSystem, Path path) throws IOException {
if (!fileSystem.exists(path)) {
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", path.toString()));
}
}
public static Integer logScanParallelismPredict(Iterable<?> list) {
return logScanParallelismPredict(IterUtil.size(list));
}
public static Integer logScanParallelismPredict(Integer pathNum) {
return Math.max(1, Math.min(pathNum / 20, 100));
}
public static Integer baseScanParallelismPredict(Iterable<?> list) {
return baseScanParallelismPredict(IterUtil.size(list));
}
public static Integer baseScanParallelismPredict(Integer pathNum) {
return Math.max(1, Math.min(pathNum / 2, 500));
}
public static ImmutableList<String> latestBasePaths(FileSystem fileSystem, String root) throws IOException {
return latestBasePaths(fileSystem, new Path(root));
}
public static ImmutableList<String> latestBasePaths(FileSystem fileSystem, Path root) throws IOException {
return basePaths(fileSystem, root)
.asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(Path::new)
.reject(path -> {
try {
return FSUtils.getFileSize(fileSystem, path) < 1;
} catch (IOException e) {
logger.error("Get file size error", e);
}
return true;
})
.groupBy(FSUtils::getFileIdFromFilePath)
.multiValuesView()
.collect(pathList -> pathList
.toSortedListBy(path -> {
String commitTime = FSUtils.getCommitTime(path.getName());
try {
return Long.valueOf(commitTime);
} catch (Throwable throwable) {
return 0L;
}
})
.getLastOptional())
.select(Optional::isPresent)
.collect(Optional::get)
.collect(Path::toString)
.toList()
.toImmutable();
}
public static ImmutableList<String> basePaths(FileSystem fileSystem, String root) throws IOException {
return basePaths(fileSystem, new Path(root));
}
public static ImmutableList<String> basePaths(FileSystem fileSystem, Path root) throws IOException {
return hdfsPaths(fileSystem, root, FSUtils::isBaseFile);
}
public static ImmutableList<String> logPaths(FileSystem fileSystem, String root) throws IOException {
return logPaths(fileSystem, new Path(root));
}
public static ImmutableList<String> logPaths(FileSystem fileSystem, Path root) throws IOException {
return hdfsPaths(fileSystem, root, FSUtils::isLogFile);
}
public static ImmutableList<String> hdfsPaths(FileSystem fileSystem, Path root, Predicate<Path> check) throws IOException {
return Lists.immutable.of(fileSystem.listStatus(root))
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
.flatCollect(status -> {
try {
if (status.isDirectory()) {
return Lists.immutable.of(fileSystem.listStatus(status.getPath()));
} else {
return Lists.immutable.of(status);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(FileStatus::getPath)
.select(check::test)
.collect(Path::toString);
}
public static void createResult(FileSystem fileSystem, TaskContext context, String result) throws IOException {
Path resultPath = new Path(context.getResultPath() + "/" + context.getTaskId() + "/task-result");
try(FSDataOutputStream outputStream = fileSystem.create(resultPath)) {
outputStream.writeUTF(result);
}
}
}