diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java index 8fbddab..17437d1 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/HoodiePolice.java @@ -9,18 +9,11 @@ import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSourc import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper; import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.core.fs.Path; +import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; -import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.lanyuanxiaoyao.service.common.Constants.GB; -import static com.lanyuanxiaoyao.service.common.Constants.MINUTE; - /** * Mini Hudi * @@ -55,6 +48,12 @@ public class HoodiePolice { .filter(ObjectUtil::isNotNull) .keyBy(Prisoner::getPartition) .keyBy(Prisoner::getKey) + .reduce(new RichReduceFunction() { + @Override + public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception { + return null; + } + }) .addSink(new PrisonerSink(taskContext)); environment.execute(); } diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrimaryKeyReduce.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrimaryKeyReduce.java new file mode 100644 index 0000000..05703c9 --- /dev/null +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrimaryKeyReduce.java @@ -0,0 +1,23 @@ +package com.lanyuanxiaoyao.service.executor.task.functions.police; + +import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.multimap.MutableMultimap; + +/** + * @author lanyuanxiaoyao + */ +public class PrimaryKeyReduce extends RichReduceFunction { + private final MutableMultimap CACHE; + + public PrimaryKeyReduce() { + CACHE = Lists.mutable.empty() + .groupBy(Prisoner::getKey); + } + + @Override + public Prisoner reduce(Prisoner value1, Prisoner value2) throws Exception { + return null; + } +} diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java index 23f531b..3f0a709 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/police/PrisonerSink.java @@ -2,13 +2,8 @@ package com.lanyuanxiaoyao.service.executor.task.functions.police; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; -import com.github.benmanes.caffeine.cache.RemovalListener; import com.lanyuanxiaoyao.service.executor.core.TaskContext; import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner; -import java.io.IOException; -import java.util.concurrent.TimeUnit; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -21,7 +16,6 @@ import org.apache.hadoop.fs.Path; public class PrisonerSink extends RichSinkFunction { private final TaskContext taskContext; private FileSystem fileSystem; - private LoadingCache outputStreamLoadingCache; public PrisonerSink(TaskContext taskContext) { this.taskContext = taskContext; @@ -31,33 +25,11 @@ public class PrisonerSink extends RichSinkFunction { public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { super.open(parameters); this.fileSystem = FileSystem.get(new Configuration()); - this.outputStreamLoadingCache = Caffeine.newBuilder() - .expireAfterAccess(1, TimeUnit.MINUTES) - .evictionListener((RemovalListener) (key, value, cause) -> { - try { - if (value != null) { - value.close(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .build(path -> { - Path p = new Path(path); - if (fileSystem.exists(p)) { - return fileSystem.append(p); - } else { - return fileSystem.create(p, true); - } - }); } @Override public void close() throws Exception { super.close(); - if (ObjectUtil.isNotNull(outputStreamLoadingCache)) { - outputStreamLoadingCache.invalidateAll(); - } if (ObjectUtil.isNotNull(fileSystem)) { fileSystem.close(); } @@ -65,8 +37,15 @@ public class PrisonerSink extends RichSinkFunction { @Override public void invoke(Prisoner value, Context context) throws Exception { - FSDataOutputStream outputStream = outputStreamLoadingCache.get(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey())); - assert outputStream != null; - outputStream.writeBytes(value.toString()); + Path p = new Path(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey())); + if (fileSystem.exists(p)) { + try (FSDataOutputStream outputStream = fileSystem.append(p)) { + outputStream.writeBytes(value.toString()); + } + } else { + try (FSDataOutputStream outputStream = fileSystem.create(p, true)) { + outputStream.writeBytes(value.toString()); + } + } } }