diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 5866276b9..fb107444d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -19,11 +19,13 @@ package com.uber.hoodie.io; import com.beust.jcommander.internal.Maps; import com.clearspring.analytics.util.Lists; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDeltaWriteStat; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; @@ -58,6 +60,7 @@ public class HoodieAppendHandle extends HoodieIOH private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); private static AtomicLong recordIndex = new AtomicLong(1); + private TableFileSystemView.RealtimeView fileSystemView; private final WriteStatus writeStatus; private final String fileId; private String partitionPath; @@ -77,6 +80,7 @@ public class HoodieAppendHandle extends HoodieIOH writeStatus.setStat(new HoodieDeltaWriteStat()); this.writeStatus = writeStatus; this.fileId = fileId; + this.fileSystemView = hoodieTable.getRTFileSystemView(); init(recordItr); } @@ -87,11 +91,11 @@ public class HoodieAppendHandle extends HoodieIOH // extract some information from the first record if (partitionPath == null) { partitionPath = record.getPartitionPath(); + FileSlice fileSlice = fileSystemView.getLatestFileSlices(record.getPartitionPath()) + .filter(fileSlice1 -> fileSlice1.getDataFile().get().getFileId().equals(fileId)) + .findFirst().get(); // HACK(vc) This also assumes a base file. It will break, if appending without one. - String latestValidFilePath = - fileSystemView.getLatestDataFiles(record.getPartitionPath()) - .filter(dataFile -> dataFile.getFileId().equals(fileId)) - .findFirst().get().getFileName(); + String latestValidFilePath = fileSlice.getDataFile().get().getFileName(); String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath); writeStatus.getStat().setPrevCommit(baseCommitTime); writeStatus.setFileId(fileId); @@ -101,7 +105,9 @@ public class HoodieAppendHandle extends HoodieIOH try { this.writer = HoodieLogFormat.newWriterBuilder() .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath)) - .withFileId(fileId).overBaseCommit(baseCommitTime) + .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(fileSlice.getLogFiles() + .max(HoodieLogFile.getLogVersionComparator()::compare) + .map(logFile -> logFile.getLogVersion()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION)) .withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); this.currentLogFile = writer.getLogFile(); ((HoodieDeltaWriteStat) writeStatus.getStat()) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 332c9a2d3..a02df5213 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -40,7 +40,6 @@ public abstract class HoodieIOHandle { protected final FileSystem fs; protected final HoodieTable hoodieTable; protected HoodieTimeline hoodieTimeline; - protected TableFileSystemView.ReadOptimizedView fileSystemView; protected final Schema schema; public HoodieIOHandle(HoodieWriteConfig config, String commitTime, @@ -50,7 +49,6 @@ public abstract class HoodieIOHandle { this.fs = hoodieTable.getMetaClient().getFs(); this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); - this.fileSystemView = hoodieTable.getROFileSystemView(); this.schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index b61b9d9e8..e6ce31468 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; +import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -48,6 +49,7 @@ public class HoodieMergeHandle extends HoodieIOHa private WriteStatus writeStatus; private HashMap> keyToNewRecords; private HoodieStorageWriter storageWriter; + private TableFileSystemView.ReadOptimizedView fileSystemView; private Path newFilePath; private Path oldFilePath; private long recordsWritten = 0; @@ -60,6 +62,7 @@ public class HoodieMergeHandle extends HoodieIOHa Iterator> recordItr, String fileId) { super(config, commitTime, hoodieTable); + this.fileSystemView = hoodieTable.getROFileSystemView(); init(fileId, recordItr); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index fda6b5a26..501e452c4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -78,11 +79,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); + TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> hoodieTable - .getRTFileSystemView() + .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) .map(s -> new CompactionOperation(s.getDataFile().get(), partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java index 4e09f5f33..c4a527914 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; public class HoodieLogFile implements Serializable { public static final String DELTA_EXTENSION = ".log"; + public static final Integer LOGFILE_BASE_VERSION = 1; private final Path path; private Optional fileStatus; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 5417f01c9..f0173f077 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -284,7 +284,7 @@ public class FSUtils { Optional currentVersion = getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow - return (currentVersion.isPresent()) ? currentVersion.get() : 1; + return (currentVersion.isPresent()) ? currentVersion.get() : HoodieLogFile.LOGFILE_BASE_VERSION; } /** @@ -295,7 +295,7 @@ public class FSUtils { Optional currentVersion = getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow - return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1; + return (currentVersion.isPresent()) ? currentVersion.get() + 1 : HoodieLogFile.LOGFILE_BASE_VERSION; } public static int getDefaultBufferSize(final FileSystem fs) {