From 26f9d4ac6066fac6045108415c3f0d0f6a164a09 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 17 Jan 2024 16:26:23 +0800 Subject: [PATCH] =?UTF-8?q?fix(executor-task):=20=E4=BF=AE=E5=A4=8Dschema?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E4=B8=8D=E5=88=B0=E5=AF=BC=E8=87=B4=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/task/functions/ReadHudiFile.java | 4 +-- .../executor/task/ParseMessageType.java | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java index 042e891..642062b 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java @@ -91,9 +91,7 @@ public class ReadHudiFile implements FlatMapFunction { } private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out) throws IOException { - MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, logFilePath); - Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType)); - try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), schema)) { + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), null)) { while (reader.hasNext()) { HoodieLogBlock block = reader.next(); Map logBlockHeader = block.getLogBlockHeader(); diff --git a/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java new file mode 100644 index 0000000..7bbf39b --- /dev/null +++ b/service-executor/service-executor-task/src/test/java/com/lanyuanxiaoyao/service/executor/task/ParseMessageType.java @@ -0,0 +1,29 @@ +package com.lanyuanxiaoyao.service.executor.task; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; + +/** + * 解析message type + * + * @author lanyuanxiaoyao + * @date 2024-01-17 + */ +public class ParseMessageType { + public static void main(String[] args) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration()); + // MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fileSystem, new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0")); + // System.out.println(messageType); + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0")), null)) { + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + System.out.println(block.getBlockType()); + } + } + } +}