feat(executor-task): 增加一个mini hudi的功能

从pulsar到hdfs落盘
This commit is contained in:
v-zhangjc9
2024-05-28 18:16:09 +08:00
parent 095347cd1a
commit deae4fd294
11 changed files with 507 additions and 95 deletions

View File

@@ -82,6 +82,11 @@
<artifactId>asciitable</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,61 @@
package com.lanyuanxiaoyao.service.executor.task;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import com.lanyuanxiaoyao.service.executor.task.functions.police.PrisonerSink;
import com.lanyuanxiaoyao.service.executor.task.functions.police.PulsarMessage2Prisoner;
import com.lanyuanxiaoyao.service.executor.task.functions.pulsar.ReadPulsarSource;
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.GB;
import static com.lanyuanxiaoyao.service.common.Constants.MINUTE;
/**
* Mini Hudi
*
* @author lanyuanxiaoyao
*/
public class HoodiePolice {
private static final Logger logger = LoggerFactory.getLogger(HoodiePolice.class);
public static void main(String[] args) throws Exception {
TaskContext taskContext = ArgumentsHelper.getContext(args);
logger.info("Context: {}", taskContext);
ArgumentsHelper.checkMetadata(taskContext, "pulsar_url");
String pulsarUrl = (String) taskContext.getMetadata().get("pulsar_url");
ArgumentsHelper.checkMetadata(taskContext, "pulsar_topic");
String pulsarTopic = (String) taskContext.getMetadata().get("pulsar_topic");
ArgumentsHelper.checkMetadata(taskContext, "start_time");
Long startTime = (Long) taskContext.getMetadata().get("start_time");
ArgumentsHelper.checkMetadata(taskContext, "end_time");
Long endTime = (Long) taskContext.getMetadata().get("end_time");
ArgumentsHelper.checkMetadata(taskContext, "primary_keys");
String[] primaryKeys = ((String) taskContext.getMetadata().get("primary_keys")).split(",");
ArgumentsHelper.checkMetadata(taskContext, "partition_keys");
String[] partitionKeys = ((String) taskContext.getMetadata().get("partition_keys")).split(",");
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
environment.setParallelism(30);
environment
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
.disableChaining()
.map(new PulsarMessage2Prisoner(startTime, endTime, primaryKeys, partitionKeys))
.filter(ObjectUtil::isNotNull)
.keyBy(Prisoner::getPartition)
.keyBy(Prisoner::getKey)
.addSink(new PrisonerSink(taskContext));
environment.execute();
}
}

View File

@@ -0,0 +1,75 @@
package com.lanyuanxiaoyao.service.executor.task.entity;
import java.io.Serializable;
/**
* @author lanyuanxiaoyao
*/
public class Prisoner implements Serializable {
private final String messageId;
private final String type;
private final String key;
private final String partition;
private final Long timestamp;
private final Boolean error;
private final String errorMessage;
private Prisoner(String messageId, String type, String key, String partition, Long timestamp, Boolean error, String errorMessage) {
this.messageId = messageId;
this.type = type;
this.key = key;
this.partition = partition;
this.timestamp = timestamp;
this.error = error;
this.errorMessage = errorMessage;
}
public Prisoner(String messageId, String type, String key, String partition, Long timestamp) {
this(messageId, type, key, partition, timestamp, false, null);
}
public Prisoner(String messageId, String errorMessage) {
this(messageId, null, "error", "error", null, true, errorMessage);
}
public String getMessageId() {
return messageId;
}
public String getType() {
return type;
}
public String getKey() {
return key;
}
public String getPartition() {
return partition;
}
public Long getTimestamp() {
return timestamp;
}
public Boolean getError() {
return error;
}
public String getErrorMessage() {
return errorMessage;
}
@Override
public String toString() {
return "Prisoner{" +
"messageId='" + messageId + '\'' +
", type='" + type + '\'' +
", key='" + key + '\'' +
", partition='" + partition + '\'' +
", timestamp=" + timestamp +
", error=" + error +
", errorMessage='" + errorMessage + '\'' +
'}';
}
}

View File

@@ -0,0 +1,72 @@
package com.lanyuanxiaoyao.service.executor.task.functions.police;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @author lanyuanxiaoyao
*/
public class PrisonerSink extends RichSinkFunction<Prisoner> {
private final TaskContext taskContext;
private FileSystem fileSystem;
private LoadingCache<String, FSDataOutputStream> outputStreamLoadingCache;
public PrisonerSink(TaskContext taskContext) {
this.taskContext = taskContext;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
this.fileSystem = FileSystem.get(new Configuration());
this.outputStreamLoadingCache = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.evictionListener((RemovalListener<String, FSDataOutputStream>) (key, value, cause) -> {
try {
if (value != null) {
value.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.build(path -> {
Path p = new Path(path);
if (fileSystem.exists(p)) {
return fileSystem.append(p);
} else {
return fileSystem.create(p, true);
}
});
}
@Override
public void close() throws Exception {
super.close();
if (ObjectUtil.isNotNull(outputStreamLoadingCache)) {
outputStreamLoadingCache.invalidateAll();
}
if (ObjectUtil.isNotNull(fileSystem)) {
fileSystem.close();
}
}
@Override
public void invoke(Prisoner value, Context context) throws Exception {
FSDataOutputStream outputStream = outputStreamLoadingCache.get(StrUtil.format("{}/{}/{}/{}/records", taskContext.getResultPath(), taskContext.getTaskId(), value.getPartition(), value.getKey()));
assert outputStream != null;
outputStream.writeBytes(value.toString());
}
}

View File

@@ -0,0 +1,83 @@
package com.lanyuanxiaoyao.service.executor.task.functions.police;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
import com.lanyuanxiaoyao.service.executor.task.entity.Prisoner;
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import com.lanyuanxiaoyao.service.executor.task.helper.JacksonHelper;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.factory.Lists;
/**
* @author lanyuanxiaoyao
*/
public class PulsarMessage2Prisoner extends RichMapFunction<RecordView, Prisoner> {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
private final ObjectMapper mapper = JacksonHelper.getMapper();
private final long startTime;
private final long endTime;
private final String[] primaryKeys;
private final String[] partitionKeys;
public PulsarMessage2Prisoner(long startTime, long endTime, String[] primaryKeys, String[] partitionKeys) {
this.startTime = startTime;
this.endTime = endTime;
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
}
@Override
public Prisoner map(RecordView value) {
Record record;
try {
record = mapper.readValue(value.getData(), Record.class);
} catch (Throwable e) {
return new Prisoner(value.getFile(), StrUtil.format("{}: {}", e.getMessage(), value.getData()));
}
Record.Statement statement = record.getStatement();
if (ObjectUtil.isNull(statement) || StrUtil.isBlank(statement.getOpTs())) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid statement: {}", value.getData()));
}
long timestamp;
try {
if (OPTS_PATTERN.matcher(statement.getOpTs()).matches()) {
timestamp = LocalDateTime.parse(statement.getOpTs(), FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
} else {
throw new Exception("opts not match " + OPTS_PATTERN.pattern());
}
} catch (Throwable e) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid timestamp: {}", value.getData()));
}
if (timestamp < startTime || timestamp > endTime) {
return null;
}
Map<String, Object> fields;
if (StrUtil.equalsAny(statement.getOpType(), Constants.INSERT, Constants.UPDATE)) {
fields = record.getStatement().getAfter();
} else if (StrUtil.equals(statement.getOpType(), Constants.DELETE)) {
fields = record.getStatement().getBefore();
} else {
return new Prisoner(value.getFile(), StrUtil.format("Invalid opType: {}", value.getData()));
}
String primaryKey = Lists.immutable.of(primaryKeys).select(fields::containsKey).collect(fields::get).makeString(",");
String partitionKey = Lists.immutable.of(partitionKeys).select(fields::containsKey).collect(fields::get).makeString(",");
if (StrUtil.isBlank(primaryKey)) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid primaryKey: {}", value.getData()));
}
if (StrUtil.isBlank(partitionKey)) {
return new Prisoner(value.getFile(), StrUtil.format("Invalid partitionKey: {}", value.getData()));
}
return new Prisoner(value.getFile(), record.getStatement().getOpType(), primaryKey, partitionKey, timestamp);
}
}