diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index fbc2b2bf5..2bd5d8668 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -17,6 +17,7 @@ package com.uber.hoodie.io; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; @@ -71,30 +72,33 @@ public class HoodieMergeHandle extends HoodieIOHa Iterator> recordItr, String fileId) { super(config, commitTime, hoodieTable); this.fileSystemView = hoodieTable.getROFileSystemView(); - init(fileId, init(fileId, recordItr)); + String partitionPath = init(fileId, recordItr); + init(fileId, partitionPath, + fileSystemView.getLatestDataFiles(partitionPath) + .filter(dataFile -> dataFile.getFileId().equals(fileId)).findFirst()); } public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, - Map> keyToNewRecords, String fileId) { + Map> keyToNewRecords, String fileId, Optional dataFileToBeMerged) { super(config, commitTime, hoodieTable); this.fileSystemView = hoodieTable.getROFileSystemView(); this.keyToNewRecords = keyToNewRecords; init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()) - .getPartitionPath()); + .getPartitionPath(), dataFileToBeMerged); } /** * Extract old file path, initialize StorageWriter and WriteStatus */ - private void init(String fileId, String partitionPath) { + private void init(String fileId, String partitionPath, Optional dataFileToBeMerged) { this.writtenRecordKeys = new HashSet<>(); WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieWriteStat()); this.writeStatus = writeStatus; try { - String latestValidFilePath = fileSystemView.getLatestDataFiles(partitionPath) - .filter(dataFile -> dataFile.getFileId().equals(fileId)).findFirst().get().getFileName(); + //TODO: dataFileToBeMerged must be optional. Will be fixed by Nishith's changes to support insert to log-files + String latestValidFilePath = dataFileToBeMerged.get().getFileName(); writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index ef97abff8..3659437c6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -124,13 +124,17 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { return Lists.newArrayList(); } + Optional oldDataFileOpt = hoodieCopyOnWriteTable.getROFileSystemView() + .getLatestDataFilesOn(operation.getPartitionPath(), operation.getBaseInstantTime()) + .filter(df -> df.getFileId().equals(operation.getFileId())).findFirst(); + // Compacting is very similar to applying updates to existing file Iterator> result; // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a // new base parquet file. if (operation.getDataFilePath().isPresent()) { result = hoodieCopyOnWriteTable - .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords()); + .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt); } else { result = hoodieCopyOnWriteTable .handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index de9d84000..fa180dba4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -173,26 +173,26 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); } - public Iterator> handleUpdate(String commitTime, String fileLoc, + public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) throws IOException { // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); - return handleUpdateInternal(upsertHandle, commitTime, fileLoc); + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr); + return handleUpdateInternal(upsertHandle, commitTime, fileId); } - public Iterator> handleUpdate(String commitTime, String fileLoc, - Map> keyToNewRecords) throws IOException { + public Iterator> handleUpdate(String commitTime, String fileId, + Map> keyToNewRecords, Optional dataFileOpt) throws IOException { // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords); - return handleUpdateInternal(upsertHandle, commitTime, fileLoc); + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, dataFileOpt); + return handleUpdateInternal(upsertHandle, commitTime, fileId); } protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, - String commitTime, String fileLoc) + String commitTime, String fileId) throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( - "Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc); + "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId); } else { AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) @@ -222,14 +222,14 @@ public class HoodieCopyOnWriteTable extends Hoodi .iterator(); } - protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, + protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, Iterator> recordItr) { - return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); + return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileId); } - protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, - Map> keyToNewRecords) { - return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc); + protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, + Map> keyToNewRecords, Optional dataFileToBeMerged) { + return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged); } public Iterator> handleInsert(String commitTime, diff --git a/hoodie-client/src/test/resources/log4j-surefire.properties b/hoodie-client/src/test/resources/log4j-surefire.properties index 23ded09c3..daf8d28c1 100644 --- a/hoodie-client/src/test/resources/log4j-surefire.properties +++ b/hoodie-client/src/test/resources/log4j-surefire.properties @@ -14,8 +14,12 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=WARN -log4j.category.org.apache.parquet.hadoop=ERROR +log4j.category.com.uber=INFO +log4j.category.com.uber.hoodie.common.utils=WARN +log4j.category.com.uber.hoodie.io=WARN +log4j.category.com.uber.hoodie.common=WARN +log4j.category.com.uber.hoodie.table.log=WARN +log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 790ba770e..7adc9a164 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -174,6 +174,8 @@ public interface HoodieTimeline extends Serializable { /** * Helper methods to compare instants **/ + BiPredicate EQUAL = + (commit1, commit2) -> commit1.compareTo(commit2) == 0; BiPredicate GREATER_OR_EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) >= 0; BiPredicate GREATER = (commit1, commit2) -> commit1.compareTo(commit2) > 0; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index 637f6e874..d0f2d87e0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -51,6 +51,12 @@ public interface TableFileSystemView { Stream getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime); + /** + * Stream all the latest version data files in the given partition with precondition that + * instant time of file matches passed in instant time. + */ + Stream getLatestDataFilesOn(String partitionPath, String instantTime); + /** * Stream all the latest data files pass */ diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 8aa585728..edcaa8261 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -256,6 +256,22 @@ public class HoodieTableFileSystemView implements TableFileSystemView, .map(Optional::get); } + @Override + public Stream getLatestDataFilesOn(String partitionPath, String instantTime) { + return getAllFileGroups(partitionPath) + .map(fileGroup -> { + return fileGroup.getAllDataFiles() + .filter(dataFile -> + HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), + instantTime, + HoodieTimeline.EQUAL)) + .filter(df -> !isDataFileDueToPendingCompaction(df)) + .findFirst(); + }) + .filter(Optional::isPresent) + .map(Optional::get); + } + @Override public Stream getAllDataFiles(String partitionPath) { return getAllFileGroups(partitionPath) diff --git a/hoodie-common/src/test/resources/log4j-surefire.properties b/hoodie-common/src/test/resources/log4j-surefire.properties index 23ded09c3..ea3e93545 100644 --- a/hoodie-common/src/test/resources/log4j-surefire.properties +++ b/hoodie-common/src/test/resources/log4j-surefire.properties @@ -14,8 +14,10 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=WARN -log4j.category.org.apache.parquet.hadoop=ERROR +log4j.category.com.uber=INFO +log4j.category.com.uber.hoodie.table.log=WARN +log4j.category.com.uber.hoodie.common.util=WARN +log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties b/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties index 23ded09c3..3613e7d12 100644 --- a/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties +++ b/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties @@ -14,8 +14,8 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=WARN -log4j.category.org.apache.parquet.hadoop=ERROR +log4j.category.com.uber=INFO +log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-hive/src/test/resources/log4j-surefire.properties b/hoodie-hive/src/test/resources/log4j-surefire.properties index 1d7678513..8027f04d8 100644 --- a/hoodie-hive/src/test/resources/log4j-surefire.properties +++ b/hoodie-hive/src/test/resources/log4j-surefire.properties @@ -14,9 +14,9 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=WARN -log4j.category.org.apache.parquet.hadoop=ERROR -log4j.category.parquet.hadoop=ERROR +log4j.category.com.uber=INFO +log4j.category.org.apache.parquet.hadoop=WARN +log4j.category.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-spark/src/test/resources/log4j-surefire.properties b/hoodie-spark/src/test/resources/log4j-surefire.properties index 23ded09c3..daf8d28c1 100644 --- a/hoodie-spark/src/test/resources/log4j-surefire.properties +++ b/hoodie-spark/src/test/resources/log4j-surefire.properties @@ -14,8 +14,12 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=WARN -log4j.category.org.apache.parquet.hadoop=ERROR +log4j.category.com.uber=INFO +log4j.category.com.uber.hoodie.common.utils=WARN +log4j.category.com.uber.hoodie.io=WARN +log4j.category.com.uber.hoodie.common=WARN +log4j.category.com.uber.hoodie.table.log=WARN +log4j.category.org.apache.parquet.hadoop=WARN # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout.