fix(executor-task): 修复schema读取不到导致失败

This commit is contained in:
2024-01-17 16:26:23 +08:00
parent 43a1bcdb4b
commit 26f9d4ac60
2 changed files with 30 additions and 3 deletions

View File

@@ -91,9 +91,7 @@ public class ReadHudiFile implements FlatMapFunction<String, RecordView> {
}
private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector<RecordView> out) throws IOException {
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(readerFilesystem, logFilePath);
Schema schema = new AvroSchemaConverter().convert(Objects.requireNonNull(messageType));
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), schema)) {
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(readerFilesystem, new HoodieLogFile(logFilePath), null)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
Map<HoodieLogBlock.HeaderMetadataType, String> logBlockHeader = block.getLogBlockHeader();

View File

@@ -0,0 +1,29 @@
package com.lanyuanxiaoyao.service.executor.task;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
/**
* 解析message type
*
* @author lanyuanxiaoyao
* @date 2024-01-17
*/
public class ParseMessageType {
public static void main(String[] args) throws IOException {
FileSystem fileSystem = FileSystem.get(new Configuration());
// MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fileSystem, new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0"));
// System.out.println(messageType);
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(new Path("/Users/lanyuanxiaoyao/Downloads/00000665-925e-49c9-ba3b-067064f50e76_20240115220324540.log.1_62-200-0")), null)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
System.out.println(block.getBlockType());
}
}
}
}