feat(executor-task): 增加Hudi数据校验任务

This commit is contained in:
v-zhangjc9
2024-06-03 10:13:00 +08:00
parent 633db5512d
commit 7a21159ac6
3 changed files with 40 additions and 39 deletions

View File

@@ -9,18 +9,11 @@ import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSourc
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.GB;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/**
* Mini Hudi
*
@@ -55,6 +48,12 @@ public class HoodiePolice {
.filter(ObjectUtil::isNotNull)
.keyBy(Prisoner::getPartition)
.keyBy(Prisoner::getKey)
.reduce(new RichReduceFunction<Prisoner>() {
@Override
public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception {
return null;
}
})
.addSink(new PrisonerSink(taskContext));
environment.execute();
}

View File

@@ -0,0 +1,23 @@
package com.lanyuanxiaoyao.service.executor.task.functions.police;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.multimap.MutableMultimap;
/**
* @author lanyuanxiaoyao
*/
public class PrimaryKeyReduce extends RichReduceFunction<Prisoner> {
private final MutableMultimap<String, Prisoner> CACHE;
public PrimaryKeyReduce() {
CACHE = Lists.mutable.<Prisoner>empty()
.groupBy(Prisoner::getKey);
}
@Override
public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception {
return null;
}
}

View File

@@ -2,13 +2,8 @@ package com.lanyuanxiaoyao.service.executor.task.functions.police;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -21,7 +16,6 @@ import org.apache.hadoop.fs.Path;
public class PrisonerSink extends RichSinkFunction<Prisoner> {
private final TaskContext taskContext;
private FileSystem fileSystem;
private LoadingCache<String, FSDataOutputStream> outputStreamLoadingCache;
public PrisonerSink(TaskContext taskContext) {
this.taskContext = taskContext;
@@ -31,33 +25,11 @@ public class PrisonerSink extends RichSinkFunction<Prisoner> {
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
this.fileSystem = FileSystem.get(new Configuration());
this.outputStreamLoadingCache = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.evictionListener((RemovalListener<String, FSDataOutputStream>) (key, value, cause) -> {
try {
if (value != null) {
value.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.build(path -> {
Path p = new Path(path);
if (fileSystem.exists(p)) {
return fileSystem.append(p);
} else {
return fileSystem.create(p, true);
}
});
}
@Override
public void close() throws Exception {
super.close();
if (ObjectUtil.isNotNull(outputStreamLoadingCache)) {
outputStreamLoadingCache.invalidateAll();
}
if (ObjectUtil.isNotNull(fileSystem)) {
fileSystem.close();
}
@@ -65,8 +37,15 @@ public class PrisonerSink extends RichSinkFunction<Prisoner> {
@Override
public void invoke(Prisoner value, Context context) throws Exception {
FSDataOutputStream outputStream = outputStreamLoadingCache.get(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey()));
assert outputStream != null;
outputStream.writeBytes(value.toString());
Path p = new Path(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey()));
if (fileSystem.exists(p)) {
try (FSDataOutputStream outputStream = fileSystem.append(p)) {
outputStream.writeBytes(value.toString());
}
} else {
try (FSDataOutputStream outputStream = fileSystem.create(p, true)) {
outputStream.writeBytes(value.toString());
}
}
}
}