1
0

Ensure Compaction Operation compacts the data file as defined in the workload

This commit is contained in:
Balaji Varadarajan
2018-05-26 14:08:29 -07:00
committed by vinoth chandar
parent 2f8ce93030
commit 2e12c86d01
11 changed files with 74 additions and 32 deletions

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.io; package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
@@ -71,30 +72,33 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
Iterator<HoodieRecord<T>> recordItr, String fileId) { Iterator<HoodieRecord<T>> recordItr, String fileId) {
super(config, commitTime, hoodieTable); super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView(); this.fileSystemView = hoodieTable.getROFileSystemView();
init(fileId, init(fileId, recordItr)); String partitionPath = init(fileId, recordItr);
init(fileId, partitionPath,
fileSystemView.getLatestDataFiles(partitionPath)
.filter(dataFile -> dataFile.getFileId().equals(fileId)).findFirst());
} }
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId) { Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, Optional<HoodieDataFile> dataFileToBeMerged) {
super(config, commitTime, hoodieTable); super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView(); this.fileSystemView = hoodieTable.getROFileSystemView();
this.keyToNewRecords = keyToNewRecords; this.keyToNewRecords = keyToNewRecords;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()) init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
.getPartitionPath()); .getPartitionPath(), dataFileToBeMerged);
} }
/** /**
* Extract old file path, initialize StorageWriter and WriteStatus * Extract old file path, initialize StorageWriter and WriteStatus
*/ */
private void init(String fileId, String partitionPath) { private void init(String fileId, String partitionPath, Optional<HoodieDataFile> dataFileToBeMerged) {
this.writtenRecordKeys = new HashSet<>(); this.writtenRecordKeys = new HashSet<>();
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
writeStatus.setStat(new HoodieWriteStat()); writeStatus.setStat(new HoodieWriteStat());
this.writeStatus = writeStatus; this.writeStatus = writeStatus;
try { try {
String latestValidFilePath = fileSystemView.getLatestDataFiles(partitionPath) //TODO: dataFileToBeMerged must be optional. Will be fixed by Nishith's changes to support insert to log-files
.filter(dataFile -> dataFile.getFileId().equals(fileId)).findFirst().get().getFileName(); String latestValidFilePath = dataFileToBeMerged.get().getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,

View File

@@ -124,13 +124,17 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
return Lists.<WriteStatus>newArrayList(); return Lists.<WriteStatus>newArrayList();
} }
Optional<HoodieDataFile> oldDataFileOpt = hoodieCopyOnWriteTable.getROFileSystemView()
.getLatestDataFilesOn(operation.getPartitionPath(), operation.getBaseInstantTime())
.filter(df -> df.getFileId().equals(operation.getFileId())).findFirst();
// Compacting is very similar to applying updates to existing file // Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result; Iterator<List<WriteStatus>> result;
// If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
// new base parquet file. // new base parquet file.
if (operation.getDataFilePath().isPresent()) { if (operation.getDataFilePath().isPresent()) {
result = hoodieCopyOnWriteTable result = hoodieCopyOnWriteTable
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords()); .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt);
} else { } else {
result = hoodieCopyOnWriteTable result = hoodieCopyOnWriteTable
.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator()); .handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator());

View File

@@ -173,26 +173,26 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table");
} }
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc, public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException { Iterator<HoodieRecord<T>> recordItr) throws IOException {
// these are updates // these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc); return handleUpdateInternal(upsertHandle, commitTime, fileId);
} }
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc, public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords) throws IOException { Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileOpt) throws IOException {
// these are updates // these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords); HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, dataFileOpt);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc); return handleUpdateInternal(upsertHandle, commitTime, fileId);
} }
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle,
String commitTime, String fileLoc) String commitTime, String fileId)
throws IOException { throws IOException {
if (upsertHandle.getOldFilePath() == null) { if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException( throw new HoodieUpsertException(
"Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc); "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId);
} else { } else {
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath())
@@ -222,14 +222,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
.iterator(); .iterator();
} }
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId,
Iterator<HoodieRecord<T>> recordItr) { Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileId);
} }
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords) { Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc); return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
} }
public Iterator<List<WriteStatus>> handleInsert(String commitTime, public Iterator<List<WriteStatus>> handleInsert(String commitTime,

View File

@@ -14,8 +14,12 @@
# limitations under the License. # limitations under the License.
# #
log4j.rootLogger=WARN, A1 log4j.rootLogger=WARN, A1
log4j.category.com.uber=WARN log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=ERROR log4j.category.com.uber.hoodie.common.utils=WARN
log4j.category.com.uber.hoodie.io=WARN
log4j.category.com.uber.hoodie.common=WARN
log4j.category.com.uber.hoodie.table.log=WARN
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.

View File

@@ -174,6 +174,8 @@ public interface HoodieTimeline extends Serializable {
/** /**
* Helper methods to compare instants * Helper methods to compare instants
**/ **/
BiPredicate<String, String> EQUAL =
(commit1, commit2) -> commit1.compareTo(commit2) == 0;
BiPredicate<String, String> GREATER_OR_EQUAL = BiPredicate<String, String> GREATER_OR_EQUAL =
(commit1, commit2) -> commit1.compareTo(commit2) >= 0; (commit1, commit2) -> commit1.compareTo(commit2) >= 0;
BiPredicate<String, String> GREATER = (commit1, commit2) -> commit1.compareTo(commit2) > 0; BiPredicate<String, String> GREATER = (commit1, commit2) -> commit1.compareTo(commit2) > 0;

View File

@@ -51,6 +51,12 @@ public interface TableFileSystemView {
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath, Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime); String maxCommitTime);
/**
* Stream all the latest version data files in the given partition with precondition that
* instant time of file matches passed in instant time.
*/
Stream<HoodieDataFile> getLatestDataFilesOn(String partitionPath, String instantTime);
/** /**
* Stream all the latest data files pass * Stream all the latest data files pass
*/ */

View File

@@ -256,6 +256,22 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
.map(Optional::get); .map(Optional::get);
} }
@Override
public Stream<HoodieDataFile> getLatestDataFilesOn(String partitionPath, String instantTime) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> {
return fileGroup.getAllDataFiles()
.filter(dataFile ->
HoodieTimeline.compareTimestamps(dataFile.getCommitTime(),
instantTime,
HoodieTimeline.EQUAL))
.filter(df -> !isDataFileDueToPendingCompaction(df))
.findFirst();
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@Override @Override
public Stream<HoodieDataFile> getAllDataFiles(String partitionPath) { public Stream<HoodieDataFile> getAllDataFiles(String partitionPath) {
return getAllFileGroups(partitionPath) return getAllFileGroups(partitionPath)

View File

@@ -14,8 +14,10 @@
# limitations under the License. # limitations under the License.
# #
log4j.rootLogger=WARN, A1 log4j.rootLogger=WARN, A1
log4j.category.com.uber=WARN log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=ERROR log4j.category.com.uber.hoodie.table.log=WARN
log4j.category.com.uber.hoodie.common.util=WARN
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.

View File

@@ -14,8 +14,8 @@
# limitations under the License. # limitations under the License.
# #
log4j.rootLogger=WARN, A1 log4j.rootLogger=WARN, A1
log4j.category.com.uber=WARN log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=ERROR log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.

View File

@@ -14,9 +14,9 @@
# limitations under the License. # limitations under the License.
# #
log4j.rootLogger=WARN, A1 log4j.rootLogger=WARN, A1
log4j.category.com.uber=WARN log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=ERROR log4j.category.org.apache.parquet.hadoop=WARN
log4j.category.parquet.hadoop=ERROR log4j.category.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.

View File

@@ -14,8 +14,12 @@
# limitations under the License. # limitations under the License.
# #
log4j.rootLogger=WARN, A1 log4j.rootLogger=WARN, A1
log4j.category.com.uber=WARN log4j.category.com.uber=INFO
log4j.category.org.apache.parquet.hadoop=ERROR log4j.category.com.uber.hoodie.common.utils=WARN
log4j.category.com.uber.hoodie.io=WARN
log4j.category.com.uber.hoodie.common=WARN
log4j.category.com.uber.hoodie.table.log=WARN
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender. # A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout. # A1 uses PatternLayout.