perf(executor-task): 减少数据流转
通过设置相同的并行度,让数据读取和数据过滤合并到一个算子里,避免大表base文件扫描失败
This commit is contained in:
@@ -1,9 +1,48 @@
|
||||
package com.lanyuanxiaoyao.service.command.pro.commands;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.BaseFile;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.ClosableIterator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.io.storage.HoodieParquetReader;
|
||||
import org.apache.hudi.org.apache.avro.Schema;
|
||||
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.parquet.example.data.Group;
|
||||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
import org.apache.parquet.hadoop.example.GroupReadSupport;
|
||||
import org.apache.parquet.hadoop.util.HadoopInputFile;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.MutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.shell.standard.ShellComponent;
|
||||
import org.springframework.shell.standard.ShellMethod;
|
||||
import org.springframework.shell.standard.ShellOption;
|
||||
|
||||
/**
|
||||
* Hudi相关操作
|
||||
@@ -15,7 +54,111 @@ import org.springframework.shell.standard.ShellMethod;
|
||||
public class HudiCommand {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HudiCommand.class);
|
||||
|
||||
@ShellMethod("Test")
|
||||
public void test() {
|
||||
public static void time(String name, Runnable runnable) {
|
||||
logger.info(name);
|
||||
long startTime = Instant.now().toEpochMilli();
|
||||
LongAdder counter = new LongAdder();
|
||||
runnable.run(counter);
|
||||
logger.info("Count: {}", counter.sum());
|
||||
logger.info("Cost: {}s", (Instant.now().toEpochMilli() - startTime) / 1000.0);
|
||||
}
|
||||
|
||||
public static void reader1(LongAdder counter, Configuration configuration, Path root) {
|
||||
try (HoodieParquetReader<IndexedRecord> reader = new HoodieParquetReader<>(configuration, root)) {
|
||||
try (ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator()) {
|
||||
while (recordIterator.hasNext()) {
|
||||
recordIterator.next();
|
||||
counter.increment();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void reader2(LongAdder counter, Configuration configuration, Path root) {
|
||||
GroupReadSupport readSupport = new GroupReadSupport();
|
||||
try (ParquetReader<Group> reader = AvroParquetReader.builder(readSupport, root).withConf(configuration).build()) {
|
||||
while (reader.read() != null) {
|
||||
counter.increment();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void reader3(LongAdder counter, Configuration configuration, Path root) {
|
||||
try (ParquetReader<GenericRecord> reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(root, configuration))) {
|
||||
while (reader.read() != null) {
|
||||
counter.increment();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@ShellMethod("Test filesystem view")
|
||||
public void testFilesystemView() throws IOException {
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
|
||||
.setConf(new Configuration())
|
||||
.setBasePath("hdfs://b2/apps/datalake/hive/dws_acct_fs/external_table_hudi/dws_bill_b")
|
||||
.setLoadActiveTimelineOnLoad(false)
|
||||
.setConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().build())
|
||||
.setLayoutVersion(Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION))
|
||||
.build();
|
||||
FileSystem fs = metaClient.getFs();
|
||||
String globPath = StrUtil.format("{}/{}/*", metaClient.getBasePathV2().toString(), "*/*/*");
|
||||
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getWriteTimeline();
|
||||
HoodieTableFileSystemView view = new HoodieTableFileSystemView(metaClient, timeline, statuses.toArray(new FileStatus[0]));
|
||||
System.out.println(
|
||||
view.fetchAllStoredFileGroups()
|
||||
.map(HoodieFileGroup::getLatestDataFile)
|
||||
.filter(Option::isPresent)
|
||||
.map(Option::get)
|
||||
.map(HoodieBaseFile::getBootstrapBaseFile)
|
||||
.map(Option::get)
|
||||
.map(BaseFile::getPath)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
@ShellMethod("Test glob")
|
||||
public void testGlob(@ShellOption(help = "parquet file path") String path) throws IOException {
|
||||
FileSystem fileSystem = FileSystem.get(new Configuration());
|
||||
MutableList<FileStatus> results = Lists.mutable.empty();
|
||||
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path(path), true);
|
||||
while (iterator.hasNext()) {
|
||||
LocatedFileStatus status = iterator.next();
|
||||
if (fileSystem.exists(status.getPath())) {
|
||||
results.add(iterator.next());
|
||||
}
|
||||
}
|
||||
|
||||
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(FileSystem.get(new Configuration()), new Path(path, "*/*/*/*"));
|
||||
logger.info("Sum1: {} Sum2: {}", results.size(), statuses.size());
|
||||
}
|
||||
|
||||
@ShellMethod("Test read parquet file by column")
|
||||
public void testColumnReadParquet(@ShellOption(help = "parquet file path") String path) throws IOException {
|
||||
Configuration configuration = new Configuration();
|
||||
Path root = new Path(path);
|
||||
|
||||
time("reader 1", counter -> reader1(counter, configuration, root));
|
||||
time("reader 2", counter -> reader2(counter, configuration, root));
|
||||
|
||||
ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
||||
Schema schema = baseFileUtils.readAvroSchema(configuration, root);
|
||||
MutableList<Schema.Field> fields = Lists.mutable.ofAll(schema.getFields())
|
||||
.select(field -> field.name().equals("CODE"))
|
||||
.collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
|
||||
.toList();
|
||||
Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields);
|
||||
AvroReadSupport.setRequestedProjection(configuration, readSchema);
|
||||
time("reader 3", counter -> reader3(counter, configuration, root));
|
||||
}
|
||||
|
||||
public interface Runnable {
|
||||
void run(LongAdder counter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.lanyuanxiaoyao.service.command.pro;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.org.apache.avro.Schema;
|
||||
import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.list.MutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader1;
|
||||
import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader2;
|
||||
import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.reader3;
|
||||
import static com.lanyuanxiaoyao.service.command.pro.commands.HudiCommand.time;
|
||||
|
||||
public class TestParquetReader {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TestParquetReader.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
Configuration configuration = new Configuration();
|
||||
Path root = new Path("/Users/lanyuanxiaoyao/Downloads/00000007-ecf6-445e-a6ed-43805f3ef27a_5-10-0_20240511083819386.parquet");
|
||||
|
||||
time("reader 1", counter -> reader1(counter, configuration, root));
|
||||
time("reader 2", counter -> reader2(counter, configuration, root));
|
||||
ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
||||
Schema schema = baseFileUtils.readAvroSchema(configuration, root);
|
||||
MutableList<Schema.Field> fields = Lists.mutable.ofAll(schema.getFields())
|
||||
.select(field -> field.name().equals("CODE"))
|
||||
.collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
|
||||
.toList();
|
||||
Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields);
|
||||
AvroReadSupport.setRequestedProjection(configuration, readSchema);
|
||||
time("reader 3", counter -> reader3(counter, configuration, root));
|
||||
/* ParquetUtils baseFileUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
|
||||
Schema schema = baseFileUtils.readAvroSchema(configuration, root);
|
||||
logger.info("Schema: {}", schema.toString(true));
|
||||
MutableList<Schema.Field> fields = Lists.mutable.ofAll(schema.getFields())
|
||||
.select(field -> field.name().equals("CODE"))
|
||||
.collect(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
|
||||
.toList();
|
||||
Schema readSchema = Schema.createRecord(schema.getName(), null, null, false, fields);
|
||||
logger.info("Schema: {}", readSchema.toString(true));
|
||||
AvroReadSupport.setRequestedProjection(configuration, readSchema);
|
||||
try (ParquetReader<GenericRecord> reader = AvroParquetReader.genericRecordReader(HadoopInputFile.fromPath(root, configuration))) {
|
||||
GenericRecord record = reader.read();
|
||||
logger.info("{}", record);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} */
|
||||
}
|
||||
}
|
||||
17
service-command-pro/src/test/resources/logback.xml
Normal file
17
service-command-pro/src/test/resources/logback.xml
Normal file
@@ -0,0 +1,17 @@
|
||||
<configuration>
|
||||
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
|
||||
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
|
||||
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
|
||||
|
||||
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %clr(%5p) %clr([%t]){magenta} %clr(%logger{40}){cyan} #@# %m%n%wEx</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="ERROR">
|
||||
<appender-ref ref="Console"/>
|
||||
</root>
|
||||
|
||||
<logger name="com.lanyuanxiaoyao.service" level="INFO"/>
|
||||
</configuration>
|
||||
@@ -78,8 +78,7 @@ public class DataScanner {
|
||||
DataStream<String> stream = filterKeys.apply(
|
||||
environment
|
||||
.fromSource(new ReadPulsarSource(taskContext, pulsarUrl, pulsarTopic), WatermarkStrategy.noWatermarks(), "Read pulsar")
|
||||
.setParallelism(totalParallelism)
|
||||
.disableChaining(),
|
||||
.setParallelism(totalParallelism),
|
||||
totalParallelism
|
||||
);
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
@@ -110,7 +109,8 @@ public class DataScanner {
|
||||
.flatMap(new ReadHudiFile(filterFields))
|
||||
.name("Read log file")
|
||||
.setParallelism(parallelism),
|
||||
parallelismPredict.apply(totalParallelism)
|
||||
// parallelismPredict.apply(totalParallelism)
|
||||
totalParallelism
|
||||
);
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
source = stream;
|
||||
@@ -130,7 +130,8 @@ public class DataScanner {
|
||||
.flatMap(new ReadHudiFile(filterFields))
|
||||
.name("Read base file")
|
||||
.setParallelism(parallelism),
|
||||
parallelismPredict.apply(totalParallelism)
|
||||
// parallelismPredict.apply(totalParallelism)
|
||||
totalParallelism
|
||||
);
|
||||
if (ObjectUtil.isNull(source)) {
|
||||
source = stream;
|
||||
|
||||
Reference in New Issue
Block a user