1
0

Reducing list status calls from listing logfile versions, some associated refactoring

This commit is contained in:
Nishith Agarwal
2018-01-23 15:10:43 -08:00
committed by vinoth chandar
parent 937ae322ba
commit e10100fe32
6 changed files with 20 additions and 11 deletions

View File

@@ -19,11 +19,13 @@ package com.uber.hoodie.io;
import com.beust.jcommander.internal.Maps; import com.beust.jcommander.internal.Maps;
import com.clearspring.analytics.util.Lists; import com.clearspring.analytics.util.Lists;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDeltaWriteStat; import com.uber.hoodie.common.model.HoodieDeltaWriteStat;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
@@ -58,6 +60,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
private static AtomicLong recordIndex = new AtomicLong(1); private static AtomicLong recordIndex = new AtomicLong(1);
private TableFileSystemView.RealtimeView fileSystemView;
private final WriteStatus writeStatus; private final WriteStatus writeStatus;
private final String fileId; private final String fileId;
private String partitionPath; private String partitionPath;
@@ -77,6 +80,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.setStat(new HoodieDeltaWriteStat()); writeStatus.setStat(new HoodieDeltaWriteStat());
this.writeStatus = writeStatus; this.writeStatus = writeStatus;
this.fileId = fileId; this.fileId = fileId;
this.fileSystemView = hoodieTable.getRTFileSystemView();
init(recordItr); init(recordItr);
} }
@@ -87,11 +91,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
// extract some information from the first record // extract some information from the first record
if (partitionPath == null) { if (partitionPath == null) {
partitionPath = record.getPartitionPath(); partitionPath = record.getPartitionPath();
FileSlice fileSlice = fileSystemView.getLatestFileSlices(record.getPartitionPath())
.filter(fileSlice1 -> fileSlice1.getDataFile().get().getFileId().equals(fileId))
.findFirst().get();
// HACK(vc) This also assumes a base file. It will break, if appending without one. // HACK(vc) This also assumes a base file. It will break, if appending without one.
String latestValidFilePath = String latestValidFilePath = fileSlice.getDataFile().get().getFileName();
fileSystemView.getLatestDataFiles(record.getPartitionPath())
.filter(dataFile -> dataFile.getFileId().equals(fileId))
.findFirst().get().getFileName();
String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath); String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath);
writeStatus.getStat().setPrevCommit(baseCommitTime); writeStatus.getStat().setPrevCommit(baseCommitTime);
writeStatus.setFileId(fileId); writeStatus.setFileId(fileId);
@@ -101,7 +105,9 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
try { try {
this.writer = HoodieLogFormat.newWriterBuilder() this.writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime) .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(fileSlice.getLogFiles()
.max(HoodieLogFile.getLogVersionComparator()::compare)
.map(logFile -> logFile.getLogVersion()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); .withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
this.currentLogFile = writer.getLogFile(); this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()) ((HoodieDeltaWriteStat) writeStatus.getStat())

View File

@@ -40,7 +40,6 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected final FileSystem fs; protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable; protected final HoodieTable<T> hoodieTable;
protected HoodieTimeline hoodieTimeline; protected HoodieTimeline hoodieTimeline;
protected TableFileSystemView.ReadOptimizedView fileSystemView;
protected final Schema schema; protected final Schema schema;
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, public HoodieIOHandle(HoodieWriteConfig config, String commitTime,
@@ -50,7 +49,6 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
this.fs = hoodieTable.getMetaClient().getFs(); this.fs = hoodieTable.getMetaClient().getFs();
this.hoodieTable = hoodieTable; this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
this.fileSystemView = hoodieTable.getROFileSystemView();
this.schema = this.schema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
} }

View File

@@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
@@ -48,6 +49,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private WriteStatus writeStatus; private WriteStatus writeStatus;
private HashMap<String, HoodieRecord<T>> keyToNewRecords; private HashMap<String, HoodieRecord<T>> keyToNewRecords;
private HoodieStorageWriter<IndexedRecord> storageWriter; private HoodieStorageWriter<IndexedRecord> storageWriter;
private TableFileSystemView.ReadOptimizedView fileSystemView;
private Path newFilePath; private Path newFilePath;
private Path oldFilePath; private Path oldFilePath;
private long recordsWritten = 0; private long recordsWritten = 0;
@@ -60,6 +62,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
Iterator<HoodieRecord<T>> recordItr, Iterator<HoodieRecord<T>> recordItr,
String fileId) { String fileId) {
super(config, commitTime, hoodieTable); super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
init(fileId, recordItr); init(fileId, recordItr);
} }

View File

@@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -78,11 +79,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning()); config.shouldAssumeDatePartitioning());
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<CompactionOperation> operations = List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size()) jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.getRTFileSystemView()
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)
.map(s -> new CompactionOperation(s.getDataFile().get(), .map(s -> new CompactionOperation(s.getDataFile().get(),
partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) partitionPath, s.getLogFiles().collect(Collectors.toList()), config))

View File

@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
public class HoodieLogFile implements Serializable { public class HoodieLogFile implements Serializable {
public static final String DELTA_EXTENSION = ".log"; public static final String DELTA_EXTENSION = ".log";
public static final Integer LOGFILE_BASE_VERSION = 1;
private final Path path; private final Path path;
private Optional<FileStatus> fileStatus; private Optional<FileStatus> fileStatus;

View File

@@ -284,7 +284,7 @@ public class FSUtils {
Optional<Integer> currentVersion = Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow // handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() : 1; return (currentVersion.isPresent()) ? currentVersion.get() : HoodieLogFile.LOGFILE_BASE_VERSION;
} }
/** /**
@@ -295,7 +295,7 @@ public class FSUtils {
Optional<Integer> currentVersion = Optional<Integer> currentVersion =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow // handle potential overflow
return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1; return (currentVersion.isPresent()) ? currentVersion.get() + 1 : HoodieLogFile.LOGFILE_BASE_VERSION;
} }
public static int getDefaultBufferSize(final FileSystem fs) { public static int getDefaultBufferSize(final FileSystem fs) {