refactor(executor-task): 使用简单的twr写法

This commit is contained in:
v-zhangjc9
2024-04-29 14:33:35 +08:00
parent c0b48ba2dd
commit 0fa0a396ef

View File

@@ -7,7 +7,6 @@ import com.lanyuanxiaoyao.service.executor.task.entity.RecordView;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@@ -17,16 +16,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.*;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.io.storage.HoodieParquetReader;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +43,15 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
Schema schema = record.getSchema();
StringBuilder builder = new StringBuilder();
for (Schema.Field field : schema.getFields()) {
Object data = record.get(field.pos());
builder.append(field.name())
.append("(")
.append(schema)
.append("/")
.append(ObjectUtil.isNull(data) ? "null" : data.getClass().getName())
.append(")")
.append("=")
.append(record.get(field.pos()))
.append(data)
.append(" ");
}
String timestamp = null;
@@ -83,14 +89,14 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
}
private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector<RecordView> out) throws IOException {
HoodieParquetReader<IndexedRecord> reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath);
ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator();
while (recordIterator.hasNext()) {
RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next());
out.collect(recordView);
try(HoodieParquetReader<IndexedRecord> reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath)) {
try(ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator()) {
while (recordIterator.hasNext()) {
RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next());
out.collect(recordView);
}
}
}
recordIterator.close();
reader.close();
}
private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector<RecordView> out) throws IOException {