1
0

[MINOR] Inline the partition path logic into the builder (#5310)

This commit is contained in:
Danny Chan
2022-04-13 19:24:39 +08:00
committed by GitHub
parent 43de2b4702
commit 0281725c6b
8 changed files with 38 additions and 88 deletions

View File

@@ -18,6 +18,12 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -29,27 +35,15 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
@@ -83,11 +77,10 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API)
List<String> logPaths = split.getDeltaLogPaths();
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
.withBasePath(split.getBasePath())
.withLogFilePaths(logPaths)
.withLogFilePaths(split.getDeltaLogPaths())
.withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema())
.withLatestInstantTime(split.getMaxCommitTime())
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
@@ -97,12 +90,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(split.getBasePath()), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
.build();
}
private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {