feat(executor): 完成任务提交执行的验证
This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
|
||||
import com.lanyuanxiaoyao.service.executor.task.functions.ReadLogFile;
|
||||
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
|
||||
import com.lanyuanxiaoyao.service.executor.task.helper.FlinkHelper;
|
||||
import java.util.Map;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* avro 日志扫描
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-08
|
||||
*/
|
||||
public class AvroScanner {
|
||||
private static final Logger logger = LoggerFactory.getLogger(AvroScanner.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TaskContext taskContext = ArgumentsHelper.getContext(args);
|
||||
logger.info("Context: {}", taskContext);
|
||||
|
||||
Map<String, Object> metadata = taskContext.getMetadata();
|
||||
ArgumentsHelper.checkMetadata(taskContext, "hdfs");
|
||||
String hdfs = (String) metadata.get("hdfs");
|
||||
ArgumentsHelper.checkMetadata(taskContext, "key");
|
||||
String key = (String) metadata.get("key");
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(configuration);
|
||||
if (!fileSystem.exists(new Path(hdfs))) {
|
||||
throw new RuntimeException(StrUtil.format("HDFS {} is not exists", hdfs));
|
||||
}
|
||||
|
||||
ImmutableList<String> paths = Lists.immutable.of(fileSystem.listStatus(new Path(hdfs)))
|
||||
.reject(status -> StrUtil.equals(".hoodie", status.getPath().getName()))
|
||||
.collect(status -> status.getPath().toString());
|
||||
|
||||
StreamExecutionEnvironment environment = FlinkHelper.getBatchEnvironment();
|
||||
environment.setParallelism(20);
|
||||
FlinkHelper.getAllLogFilePaths(environment.fromCollection(paths.toList()))
|
||||
.flatMap(new ReadLogFile())
|
||||
.map(RecordView::toString)
|
||||
.sinkTo(FlinkHelper.createFileSink(taskContext));
|
||||
environment.execute(StrUtil.format("Search {} in {}", key, hdfs));
|
||||
}
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task;
|
||||
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import com.lanyuanxiaoyao.service.executor.task.helper.ArgumentsHelper;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Hello world
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-12-04
|
||||
*/
|
||||
public class Hello {
|
||||
private static final Logger logger = LoggerFactory.getLogger(Hello.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TaskContext taskContext = ArgumentsHelper.getContext(args);
|
||||
logger.info("Context: {}", taskContext);
|
||||
|
||||
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
environment
|
||||
.addSource(new SourceFunction<Integer>() {
|
||||
@Override
|
||||
public void run(SourceContext<Integer> context) {
|
||||
for (int index = 0; index < 10; index++) {
|
||||
context.collect(index);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
})
|
||||
.map(value -> "Index: " + value)
|
||||
.addSink(new SinkFunction<String>() {
|
||||
@Override
|
||||
public void invoke(String value, Context context) throws Exception {
|
||||
logger.info("Value: {}", value);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
environment.execute("Service task: Hello");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.entity;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 记录
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-09
|
||||
*/
|
||||
public class RecordView implements Serializable, Comparable<RecordView> {
|
||||
private final Operation operation;
|
||||
private final String data;
|
||||
private final String timestamp;
|
||||
private final String file;
|
||||
private final Map<String, Object> attributes;
|
||||
|
||||
public RecordView(Operation operation, String data, String timestamp, String file) {
|
||||
this.operation = operation;
|
||||
this.data = data;
|
||||
this.timestamp = timestamp;
|
||||
this.file = file;
|
||||
this.attributes = new HashMap<>();
|
||||
}
|
||||
|
||||
public Operation getOperation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
public String getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public String getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public String getFile() {
|
||||
return file;
|
||||
}
|
||||
|
||||
public Map<String, Object> getAttributes() {
|
||||
return attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return StrUtil.format("\n{} {} {}\n{}", operation, timestamp, file, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RecordView o) {
|
||||
if (o != null) {
|
||||
return this.timestamp.compareTo(o.timestamp);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public enum Operation {
|
||||
DELETE, UPSERT, ROLLBACK, RESULT, SOURCE
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.functions;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.eshore.odcp.hudi.connector.Constants;
|
||||
import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.block.*;
|
||||
import org.apache.hudi.common.util.ClosableIterator;
|
||||
import org.apache.hudi.org.apache.avro.Schema;
|
||||
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hudi.org.apache.avro.util.Utf8;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
/**
|
||||
* 读取log文件
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-09
|
||||
*/
|
||||
public class ReadLogFile implements FlatMapFunction<String, RecordView> {
|
||||
private RecordView parseData(String source, IndexedRecord record) {
|
||||
Schema schema = record.getSchema();
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
builder.append(field.name())
|
||||
.append("=")
|
||||
.append(record.get(field.pos()))
|
||||
.append(" ");
|
||||
}
|
||||
String timestamp = null;
|
||||
Schema.Field commitTimeField = schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
|
||||
if (ObjectUtil.isNotNull(commitTimeField)) {
|
||||
timestamp = ((Utf8) record.get(commitTimeField.pos())).toString();
|
||||
}
|
||||
String latestOpTs = null;
|
||||
Schema.Field latestOpTsField = schema.getField(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME);
|
||||
if (ObjectUtil.isNotNull(latestOpTsField)) {
|
||||
latestOpTs = ((Utf8) record.get(latestOpTsField.pos())).toString();
|
||||
}
|
||||
|
||||
String data = builder.toString();
|
||||
RecordView recordView = new RecordView(RecordView.Operation.UPSERT, data, timestamp, source);
|
||||
recordView.getAttributes().put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpTs);
|
||||
|
||||
return recordView;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(String logFilePath, Collector<RecordView> out) throws IOException {
|
||||
Configuration readerConfiguration = new Configuration();
|
||||
FileSystem readerFilesystem = FileSystem.get(readerConfiguration);
|
||||
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, new Path(logFilePath));
|
||||
Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType));
|
||||
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(new Path(logFilePath)), schema)) {
|
||||
while (reader.hasNext()) {
|
||||
HoodieLogBlock block = reader.next();
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> logBlockHeader = block.getLogBlockHeader();
|
||||
String instant = logBlockHeader.getOrDefault(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, null);
|
||||
switch (block.getBlockType()) {
|
||||
case AVRO_DATA_BLOCK:
|
||||
HoodieAvroDataBlock avroDataBlock = (HoodieAvroDataBlock) block;
|
||||
try (ClosableIterator<IndexedRecord> avroDataBlockRecordIterator = avroDataBlock.getRecordIterator()) {
|
||||
while (avroDataBlockRecordIterator.hasNext()) {
|
||||
RecordView recordView = parseData(logFilePath, avroDataBlockRecordIterator.next());
|
||||
out.collect(recordView);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case PARQUET_DATA_BLOCK:
|
||||
HoodieParquetDataBlock parquetDataBlock = (HoodieParquetDataBlock) block;
|
||||
try (ClosableIterator<IndexedRecord> parquetDataBlockRecordIterator = parquetDataBlock.getRecordIterator()) {
|
||||
while (parquetDataBlockRecordIterator.hasNext()) {
|
||||
RecordView recordView = parseData(logFilePath, parquetDataBlockRecordIterator.next());
|
||||
out.collect(recordView);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case CORRUPT_BLOCK:
|
||||
break;
|
||||
case DELETE_BLOCK:
|
||||
HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) block;
|
||||
String keys = Arrays.stream(deleteBlock.getRecordsToDelete())
|
||||
.map(deleteRecord -> deleteRecord.getHoodieKey().toString())
|
||||
.collect(Collectors.joining(" "));
|
||||
out.collect(new RecordView(RecordView.Operation.DELETE, keys, instant, logFilePath));
|
||||
break;
|
||||
case COMMAND_BLOCK:
|
||||
HoodieCommandBlock commandBlock = (HoodieCommandBlock) block;
|
||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = commandBlock.getLogBlockHeader();
|
||||
out.collect(new RecordView(RecordView.Operation.ROLLBACK, header.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME), instant, logFilePath));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.helper;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskConstants;
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import java.util.Map;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
@@ -20,4 +22,14 @@ public class ArgumentsHelper {
|
||||
}
|
||||
return JacksonHelper.getMapper().readValue(argsTool.get(TaskConstants.TASK_CONTEXT), TaskContext.class);
|
||||
}
|
||||
|
||||
public static void checkMetadata(TaskContext context, String key) {
|
||||
Map<String, Object> metadata = context.getMetadata();
|
||||
if (ObjectUtil.isEmpty(metadata)) {
|
||||
throw new RuntimeException("Metadata is empty");
|
||||
}
|
||||
if (!metadata.containsKey(key)) {
|
||||
throw new RuntimeException(key + " argument is not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.lanyuanxiaoyao.service.executor.task.helper;
|
||||
|
||||
import com.lanyuanxiaoyao.service.executor.core.TaskContext;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import org.apache.flink.api.common.RuntimeExecutionMode;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
|
||||
import org.apache.flink.api.java.functions.FlatMapIterator;
|
||||
import org.apache.flink.connector.file.sink.FileSink;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
|
||||
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-08
|
||||
*/
|
||||
public class FlinkHelper {
|
||||
public static StreamExecutionEnvironment getSteamEnvironment() {
|
||||
return StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
}
|
||||
|
||||
public static StreamExecutionEnvironment getBatchEnvironment() {
|
||||
StreamExecutionEnvironment environment = getSteamEnvironment();
|
||||
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
|
||||
return environment;
|
||||
}
|
||||
|
||||
public static FileSink<String> createFileSink(TaskContext context) {
|
||||
return createFileSink(context.getTaskId(), context.getResultPath());
|
||||
}
|
||||
|
||||
public static FileSink<String> createFileSink(String taskId, String resultPath) {
|
||||
return FileSink
|
||||
.<String>forRowFormat(new Path(resultPath + "/" + taskId), new SimpleStringEncoder<>("UTF-8"))
|
||||
.withBucketAssigner(new BasePathBucketAssigner<>())
|
||||
.withOutputFileConfig(new OutputFileConfig("task", ""))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static DataStream<String> getAllFilePaths(DataStream<String> source) {
|
||||
return source
|
||||
.map(path -> {
|
||||
Configuration configuration = new Configuration();
|
||||
FileSystem fileSystem = FileSystem.get(configuration);
|
||||
FileStatus[] statuses = fileSystem.listStatus(new org.apache.hadoop.fs.Path(path));
|
||||
String[] results = new String[statuses.length];
|
||||
for (int index = 0; index < statuses.length; index++) {
|
||||
results[index] = statuses[index].getPath().toString();
|
||||
}
|
||||
return results;
|
||||
})
|
||||
.name("Read files")
|
||||
.flatMap(new FlatMapIterator<String[], String>() {
|
||||
@Override
|
||||
public Iterator<String> flatMap(String[] strings) {
|
||||
return Arrays.asList(strings).iterator();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static DataStream<String> getAllLogFilePaths(DataStream<String> source) {
|
||||
return getAllFilePaths(source)
|
||||
.filter(FSUtils::isLogFile)
|
||||
.name("Filter log files");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user