BUGFIX - Use Guava Optional (which is Serializable) in CompactionOperation wcached to avoid NoSerializableException
This commit is contained in:
committed by
vinoth chandar
parent
ea23c9b7a0
commit
989afddd54
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.common.model;
|
package com.uber.hoodie.common.model;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
||||||
import com.uber.hoodie.common.util.FSUtils;
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
@@ -23,7 +24,6 @@ import java.util.ArrayList;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
|
|||||||
public class CompactionOperation implements Serializable {
|
public class CompactionOperation implements Serializable {
|
||||||
|
|
||||||
private String baseInstantTime;
|
private String baseInstantTime;
|
||||||
|
// Using Guava Optional as it is serializable
|
||||||
private Optional<String> dataFileCommitTime;
|
private Optional<String> dataFileCommitTime;
|
||||||
private List<String> deltaFilePaths;
|
private List<String> deltaFilePaths;
|
||||||
private Optional<String> dataFilePath;
|
private Optional<String> dataFilePath;
|
||||||
@@ -46,7 +47,7 @@ public class CompactionOperation implements Serializable {
|
|||||||
public CompactionOperation() {
|
public CompactionOperation() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompactionOperation(Optional<HoodieDataFile> dataFile, String partitionPath,
|
public CompactionOperation(java.util.Optional<HoodieDataFile> dataFile, String partitionPath,
|
||||||
List<HoodieLogFile> logFiles, Map<String, Double> metrics) {
|
List<HoodieLogFile> logFiles, Map<String, Double> metrics) {
|
||||||
if (dataFile.isPresent()) {
|
if (dataFile.isPresent()) {
|
||||||
this.baseInstantTime = dataFile.get().getCommitTime();
|
this.baseInstantTime = dataFile.get().getCommitTime();
|
||||||
@@ -55,10 +56,10 @@ public class CompactionOperation implements Serializable {
|
|||||||
this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime());
|
this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime());
|
||||||
} else {
|
} else {
|
||||||
assert logFiles.size() > 0;
|
assert logFiles.size() > 0;
|
||||||
this.dataFilePath = Optional.empty();
|
this.dataFilePath = Optional.absent();
|
||||||
this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
|
this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath());
|
||||||
this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath());
|
this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath());
|
||||||
this.dataFileCommitTime = Optional.empty();
|
this.dataFileCommitTime = Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.partitionPath = partitionPath;
|
this.partitionPath = partitionPath;
|
||||||
@@ -103,7 +104,7 @@ public class CompactionOperation implements Serializable {
|
|||||||
public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) {
|
public static CompactionOperation convertFromAvroRecordInstance(HoodieCompactionOperation operation) {
|
||||||
CompactionOperation op = new CompactionOperation();
|
CompactionOperation op = new CompactionOperation();
|
||||||
op.baseInstantTime = operation.getBaseInstantTime();
|
op.baseInstantTime = operation.getBaseInstantTime();
|
||||||
op.dataFilePath = Optional.ofNullable(operation.getDataFilePath());
|
op.dataFilePath = Optional.fromNullable(operation.getDataFilePath());
|
||||||
op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths());
|
op.deltaFilePaths = new ArrayList<>(operation.getDeltaFilePaths());
|
||||||
op.fileId = operation.getFileId();
|
op.fileId = operation.getFileId();
|
||||||
op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics());
|
op.metrics = operation.getMetrics() == null ? new HashMap<>() : new HashMap<>(operation.getMetrics());
|
||||||
|
|||||||
Reference in New Issue
Block a user