diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java index 3dd6fd4..7f2867c 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/functions/ReadHudiFile.java @@ -7,7 +7,6 @@ import com.lanyuanxiaoyao.service.executor.task.entity.RecordView; import java.io.IOException; import java.util.Arrays; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; @@ -17,16 +16,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.*; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.io.storage.HoodieParquetReader; import org.apache.hudi.org.apache.avro.Schema; import org.apache.hudi.org.apache.avro.generic.IndexedRecord; import org.apache.hudi.org.apache.avro.util.Utf8; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +43,15 @@ public class ReadHudiFile implements FlatMapFunction { Schema schema = record.getSchema(); StringBuilder builder = new StringBuilder(); for (Schema.Field field : schema.getFields()) { + Object data = record.get(field.pos()); builder.append(field.name()) + .append("(") + .append(schema) + .append("/") + .append(ObjectUtil.isNull(data) ? "null" : data.getClass().getName()) + .append(")") .append("=") - .append(record.get(field.pos())) + .append(data) .append(" "); } String timestamp = null; @@ -83,14 +89,14 @@ public class ReadHudiFile implements FlatMapFunction { } private void readDataFile(FileSystem readerFilesystem, Path dataFilePath, Collector out) throws IOException { - HoodieParquetReader reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath); - ClosableIterator recordIterator = reader.getRecordIterator(); - while (recordIterator.hasNext()) { - RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next()); - out.collect(recordView); + try(HoodieParquetReader reader = new HoodieParquetReader<>(readerFilesystem.getConf(), dataFilePath)) { + try(ClosableIterator recordIterator = reader.getRecordIterator()) { + while (recordIterator.hasNext()) { + RecordView recordView = parseData(dataFilePath.toString(), RecordView.Operation.RESULT, recordIterator.next()); + out.collect(recordView); + } + } } - recordIterator.close(); - reader.close(); } private void readLogFile(FileSystem readerFilesystem, Path logFilePath, Collector out) throws IOException {