diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java index 042e891..642062b 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java @@ -91,9 +91,7 @@ public class ReadHudiFile implements FlatMapFunction { } private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out) throws IOException { - MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, logFilePath); - Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType)); - try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), schema)) { + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), null)) { while (reader.hasNext()) { HoodieLogBlock block = reader.next(); Map logBlockHeader = block.getLogBlockHeader(); diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java new file mode 100644 index 0000000..7bbf39b --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java @@ -0,0 +1,29 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; + +/** + * 解析message type + * + * @author lanyuanxiaoyao + * @date 2024-01-17 + */ +public class ParseMessageType { + public static void main(String[] args) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration()); + // MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fileSystem, new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0")); + // System.out.println(messageType); + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0")), null)) { + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + System.out.println(block.getBlockType()); + } + } + } +}