diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java index 93f02e9..ed8a2c0 100644 --- a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java @@ -1,9 +1,48 @@ package com.lanyuanxiaoyao.service.command.pro.commands; +import cn.hutool.core.util.StrUtil; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.io.storage.HoodieParquetReader; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.hudi.org.apache.avro.generic.GenericRecord; +import org.apache.hudi.org.apache.avro.generic.IndexedRecord; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; +import org.springframework.shell.standard.ShellOption; /** * Hudi相关操作 @@ -15,7 +54,111 @@ import org.springframework.shell.standard.ShellMethod; public class HudiCommand { private static final Logger logger = LoggerFactory.getLogger(HudiCommand.class); - @ShellMethod("Test") - public void test() { + public static void time(String name, Runnable runnable) { + logger.info(name); + long startTime = Instant.now().toEpochMilli(); + LongAdder counter = new LongAdder(); + runnable.run(counter); + logger.info("Count: {}", counter.sum()); + logger.info("Cost: {}s", (Instant.now().toEpochMilli() - startTime) / 1000.0); + } + + public static void reader1(LongAdder counter, Configuration configuration, Path root) { + try (HoodieParquetReader reader = new HoodieParquetReader<>(configuration, root)) { + try (ClosableIterator recordIterator = reader.getRecordIterator()) { + while (recordIterator.hasNext()) { + recordIterator.next(); + counter.increment(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static void reader2(LongAdder counter, Configuration configuration, Path root) { + GroupReadSupport readSupport = new GroupReadSupport(); + try (ParquetReader reader = AvroParquetReader.builder(readSupport, root).withConf(configuration).build()) { + while (reader.read() != null) { + counter.increment(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void reader3(LongAdder counter, Configuration configuration, Path root) { + try (ParquetReader reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(root, configuration))) { + while (reader.read() != null) { + counter.increment(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @ShellMethod("Test filesystem view") + public void testFilesystemView() throws IOException { + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath("hdfs://b2/apps/datalake/hive/dws_acct_fs/external_table_hudi/dws_bill_b") + .setLoadActiveTimelineOnLoad(false) + .setConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().build()) + .setLayoutVersion(Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION)) + .build(); + FileSystem fs = metaClient.getFs(); + String globPath = StrUtil.format("{}/{}/*", metaClient.getBasePathV2().toString(), "*/*/*"); + List statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)); + HoodieTimeline timeline = metaClient.getActiveTimeline().getWriteTimeline(); + HoodieTableFileSystemView view = new HoodieTableFileSystemView(metaClient, timeline, statuses.toArray(new FileStatus[0])); + System.out.println( + view.fetchAllStoredFileGroups() + .map(HoodieFileGroup::getLatestDataFile) + .filter(Option::isPresent) + .map(Option::get) + .map(HoodieBaseFile::getBootstrapBaseFile) + .map(Option::get) + .map(BaseFile::getPath) + .collect(Collectors.toList()) + ); + } + + @ShellMethod("Test glob") + public void testGlob(@ShellOption(help = "parquet file path") String path) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration()); + MutableList results = Lists.mutable.empty(); + RemoteIterator iterator = fileSystem.listFiles(new Path(path), true); + while (iterator.hasNext()) { + LocatedFileStatus status = iterator.next(); + if (fileSystem.exists(status.getPath())) { + results.add(iterator.next()); + } + } + + List statuses = FSUtils.getGlobStatusExcludingMetaFolder(FileSystem.get(new Configuration()), new Path(path, "*/*/*/*")); + logger.info("Sum1: {} Sum2: {}", results.size(), statuses.size()); + } + + @ShellMethod("Test read parquet file by column") + public void testColumnReadParquet(@ShellOption(help = "parquet file path") String path) throws IOException { + Configuration configuration = new Configuration(); + Path root = new Path(path); + + time("reader 1", counter -> reader1(counter, configuration, root)); + time("reader 2", counter -> reader2(counter, configuration, root)); + + ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + Schema schema = baseFileUtils.readAvroSchema(configuration, root); + MutableList fields = Lists.mutable.ofAll(schema.getFields()) + .select(field -> field.name().equals("CODE")) + .collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .toList(); + Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields); + AvroReadSupport.setRequestedProjection(configuration, readSchema); + time("reader 3", counter -> reader3(counter, configuration, root)); + } + + public interface Runnable { + void run(LongAdder counter); } } diff --git a/service-command-pro/src/test/java/com/lanyuanxiaoyao/service/command/pro/TestParquetReader.java b/service-command-pro/src/test/java/com/lanyuanxiaoyao/service/command/pro/TestParquetReader.java new file mode 100644 index 0000000..c930f60 --- /dev/null +++ b/service-command-pro/src/test/java/com/lanyuanxiaoyao/service/command/pro/TestParquetReader.java @@ -0,0 +1,56 @@ +package com.lanyuanxiaoyao.service.command.pro; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.org.apache.avro.Schema; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.list.MutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader1; +import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader2; +import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader3; +import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.time; + +public class TestParquetReader { + private static final Logger logger = LoggerFactory.getLogger(TestParquetReader.class); + + public static void main(String[] args) { + Configuration configuration = new Configuration(); + Path root = new Path("/Users/lanyuanxiaoyao/Downloads/00000007-ecf6-445e-a6ed-43805f3ef27a_5-10-0_20240511083819386.parquet"); + + time("reader 1", counter -> reader1(counter, configuration, root)); + time("reader 2", counter -> reader2(counter, configuration, root)); + ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + Schema schema = baseFileUtils.readAvroSchema(configuration, root); + MutableList fields = Lists.mutable.ofAll(schema.getFields()) + .select(field -> field.name().equals("CODE")) + .collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .toList(); + Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields); + AvroReadSupport.setRequestedProjection(configuration, readSchema); + time("reader 3", counter -> reader3(counter, configuration, root)); + /* ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + Schema schema = baseFileUtils.readAvroSchema(configuration, root); + logger.info("Schema: {}", schema.toString(true)); + MutableList fields = Lists.mutable.ofAll(schema.getFields()) + .select(field -> field.name().equals("CODE")) + .collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .toList(); + Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields); + logger.info("Schema: {}", readSchema.toString(true)); + AvroReadSupport.setRequestedProjection(configuration, readSchema); + try (ParquetReader reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(root, configuration))) { + GenericRecord record = reader.read(); + logger.info("{}", record); + } catch (IOException e) { + throw new RuntimeException(e); + } */ + } +} diff --git a/service-command-pro/src/test/resources/logback.xml b/service-command-pro/src/test/resources/logback.xml new file mode 100644 index 0000000..cf7d603 --- /dev/null +++ b/service-command-pro/src/test/resources/logback.xml @@ -0,0 +1,17 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx + + + + + + + + + \ No newline at end of file diff --git a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java index 6a498d5..e7ff338 100644 --- a/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java +++ b/service-executor/service-executor-task/src/main/java/com/lanyuanxiaoyao/service/executor/task/DataScanner.java @@ -78,8 +78,7 @@ public class DataScanner { DataStream stream = filterKeys.apply( environment .fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar") - .setParallelism(totalParallelism) - .disableChaining(), + .setParallelism(totalParallelism), totalParallelism ); if (ObjectUtil.isNull(source)) { @@ -110,7 +109,8 @@ public class DataScanner { .flatMap(new ReadHudiFile(filterFields)) .name("Read log file") .setParallelism(parallelism), - parallelismPredict.apply(totalParallelism) + // parallelismPredict.apply(totalParallelism) + totalParallelism ); if (ObjectUtil.isNull(source)) { source = stream; @@ -130,7 +130,8 @@ public class DataScanner { .flatMap(new ReadHudiFile(filterFields)) .name("Read base file") .setParallelism(parallelism), - parallelismPredict.apply(totalParallelism) + // parallelismPredict.apply(totalParallelism) + totalParallelism ); if (ObjectUtil.isNull(source)) { source = stream;