Fixing behavior of Merge/CreateHandle for invalid/wrong schema records
This commit is contained in:
committed by
vinoth chandar
parent
994d42d307
commit
7985eb72b5
@@ -26,7 +26,6 @@ import com.uber.hoodie.exception.HoodieException;
|
||||
import com.uber.hoodie.io.HoodieCreateHandle;
|
||||
import com.uber.hoodie.io.HoodieIOHandle;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@@ -38,7 +37,6 @@ import java.util.function.Function;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.spark.TaskContext;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
|
||||
@@ -61,20 +59,30 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
this.hoodieTable = hoodieTable;
|
||||
}
|
||||
|
||||
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
|
||||
static class HoodieInsertValueGenResult<T extends HoodieRecord> {
|
||||
public T record;
|
||||
public Optional<IndexedRecord> insertValue;
|
||||
// It caches the exception seen while fetching insert value.
|
||||
public Optional<Exception> exception = Optional.empty();
|
||||
|
||||
public HoodieInsertValueGenResult(T record, Schema schema) {
|
||||
this.record = record;
|
||||
try {
|
||||
this.insertValue = record.getData().getInsertValue(schema);
|
||||
} catch (Exception e) {
|
||||
this.exception = Optional.of(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
|
||||
* expensive operations of transformation to the reader thread.
|
||||
*/
|
||||
static <T extends HoodieRecordPayload> Function<HoodieRecord<T>,
|
||||
Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>> getTransformFunction(Schema schema) {
|
||||
return hoodieRecord -> {
|
||||
try {
|
||||
return new Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>(hoodieRecord,
|
||||
hoodieRecord.getData().getInsertValue(schema));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
};
|
||||
HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(Schema schema) {
|
||||
return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -85,7 +93,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
protected List<WriteStatus> computeNext() {
|
||||
// Executor service used for launching writer thread.
|
||||
BoundedInMemoryExecutor<HoodieRecord<T>,
|
||||
Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>, List<WriteStatus>> bufferedIteratorExecutor = null;
|
||||
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
|
||||
try {
|
||||
final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig);
|
||||
bufferedIteratorExecutor =
|
||||
@@ -117,14 +125,14 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
* writes to one or more create-handles
|
||||
*/
|
||||
protected class CopyOnWriteInsertHandler extends
|
||||
BoundedInMemoryQueueConsumer<Tuple2<HoodieRecord<T>, Optional<IndexedRecord>>, List<WriteStatus>> {
|
||||
BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
|
||||
|
||||
protected final List<WriteStatus> statuses = new ArrayList<>();
|
||||
protected HoodieIOHandle handle;
|
||||
|
||||
@Override
|
||||
protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>> payload) {
|
||||
final HoodieRecord insertPayload = payload._1();
|
||||
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
|
||||
final HoodieRecord insertPayload = payload.record;
|
||||
// clean up any partial failures
|
||||
if (!partitionsCleaned.contains(insertPayload.getPartitionPath())) {
|
||||
// This insert task could fail multiple times, but Spark will faithfully retry with
|
||||
@@ -141,16 +149,16 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
.randomUUID().toString());
|
||||
}
|
||||
|
||||
if (handle.canWrite(payload._1())) {
|
||||
if (handle.canWrite(payload.record)) {
|
||||
// write the payload, if the handle has capacity
|
||||
handle.write(insertPayload, payload._2());
|
||||
handle.write(insertPayload, payload.insertValue, payload.exception);
|
||||
} else {
|
||||
// handle is full.
|
||||
statuses.add(handle.close());
|
||||
// Need to handle the rejected payload & open new handle
|
||||
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
|
||||
.randomUUID().toString());
|
||||
handle.write(insertPayload, payload._2()); // we should be able to write 1 payload.
|
||||
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,9 +25,6 @@ import com.uber.hoodie.table.HoodieTable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
|
||||
@@ -49,8 +46,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler {
|
||||
|
||||
@Override
|
||||
protected void consumeOneRecord(Tuple2<HoodieRecord<T>, Optional<IndexedRecord>> payload) {
|
||||
final HoodieRecord insertPayload = payload._1();
|
||||
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
|
||||
final HoodieRecord insertPayload = payload.record;
|
||||
List<WriteStatus> statuses = new ArrayList<>();
|
||||
// lazily initialize the handle, for the first time
|
||||
if (handle == null) {
|
||||
@@ -58,14 +55,14 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
|
||||
}
|
||||
if (handle.canWrite(insertPayload)) {
|
||||
// write the payload, if the handle has capacity
|
||||
handle.write(insertPayload, payload._2);
|
||||
handle.write(insertPayload, payload.insertValue, payload.exception);
|
||||
} else {
|
||||
// handle is full.
|
||||
handle.close();
|
||||
statuses.add(handle.getWriteStatus());
|
||||
// Need to handle the rejected payload & open new handle
|
||||
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
|
||||
handle.write(insertPayload, payload._2); // we should be able to write 1 payload.
|
||||
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
|
||||
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
|
||||
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieAppendException;
|
||||
import com.uber.hoodie.exception.HoodieUpsertException;
|
||||
@@ -63,7 +62,6 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
private static Logger logger = LogManager.getLogger(HoodieAppendHandle.class);
|
||||
// This acts as the sequenceID for records written
|
||||
private static AtomicLong recordIndex = new AtomicLong(1);
|
||||
private final WriteStatus writeStatus;
|
||||
private final String fileId;
|
||||
// Buffer for holding records in memory before they are flushed to disk
|
||||
private List<IndexedRecord> recordList = new ArrayList<>();
|
||||
@@ -98,9 +96,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
|
||||
String fileId, Iterator<HoodieRecord<T>> recordItr) {
|
||||
super(config, commitTime, hoodieTable);
|
||||
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
||||
writeStatus.setStat(new HoodieDeltaWriteStat());
|
||||
this.writeStatus = writeStatus;
|
||||
this.fileId = fileId;
|
||||
this.fileSystemView = hoodieTable.getRTFileSystemView();
|
||||
this.recordItr = recordItr;
|
||||
|
||||
@@ -24,7 +24,6 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieInsertException;
|
||||
import com.uber.hoodie.io.storage.HoodieStorageWriter;
|
||||
@@ -43,7 +42,6 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
|
||||
private static Logger logger = LogManager.getLogger(HoodieCreateHandle.class);
|
||||
|
||||
private final WriteStatus status;
|
||||
private final HoodieStorageWriter<IndexedRecord> storageWriter;
|
||||
private final Path path;
|
||||
private Path tempPath = null;
|
||||
@@ -55,14 +53,13 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
|
||||
String partitionPath, String fileId) {
|
||||
super(config, commitTime, hoodieTable);
|
||||
this.status = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
||||
status.setFileId(fileId);
|
||||
status.setPartitionPath(partitionPath);
|
||||
writeStatus.setFileId(fileId);
|
||||
writeStatus.setPartitionPath(partitionPath);
|
||||
|
||||
final int sparkPartitionId = TaskContext.getPartitionId();
|
||||
this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId());
|
||||
this.path = makeNewPath(partitionPath, sparkPartitionId, writeStatus.getFileId());
|
||||
if (config.shouldUseTempFolderForCopyOnWriteForCreate()) {
|
||||
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(),
|
||||
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, writeStatus.getFileId(),
|
||||
TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
|
||||
}
|
||||
|
||||
@@ -87,7 +84,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
|
||||
@Override
|
||||
public boolean canWrite(HoodieRecord record) {
|
||||
return storageWriter.canWrite() && record.getPartitionPath().equals(status.getPartitionPath());
|
||||
return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -99,13 +96,13 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
if (avroRecord.isPresent()) {
|
||||
storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
|
||||
// update the new location of record, so we know where to find it next
|
||||
record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId()));
|
||||
record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId()));
|
||||
recordsWritten++;
|
||||
insertRecordsWritten++;
|
||||
} else {
|
||||
recordsDeleted++;
|
||||
}
|
||||
status.markSuccess(record, recordMetadata);
|
||||
writeStatus.markSuccess(record, recordMetadata);
|
||||
// deflate record payload after recording success. This will help users access payload as a
|
||||
// part of marking
|
||||
// record successful.
|
||||
@@ -113,7 +110,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
} catch (Throwable t) {
|
||||
// Not throwing exception from here, since we don't want to fail the entire job
|
||||
// for a single record
|
||||
status.markFailure(record, t, recordMetadata);
|
||||
writeStatus.markFailure(record, t, recordMetadata);
|
||||
logger.error("Error writing record " + record, t);
|
||||
}
|
||||
}
|
||||
@@ -135,7 +132,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
|
||||
@Override
|
||||
public WriteStatus getWriteStatus() {
|
||||
return status;
|
||||
return writeStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,27 +140,27 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
*/
|
||||
@Override
|
||||
public WriteStatus close() {
|
||||
logger.info("Closing the file " + status.getFileId() + " as we are done with all the records "
|
||||
logger.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records "
|
||||
+ recordsWritten);
|
||||
try {
|
||||
storageWriter.close();
|
||||
|
||||
HoodieWriteStat stat = new HoodieWriteStat();
|
||||
stat.setPartitionPath(status.getPartitionPath());
|
||||
stat.setPartitionPath(writeStatus.getPartitionPath());
|
||||
stat.setNumWrites(recordsWritten);
|
||||
stat.setNumDeletes(recordsDeleted);
|
||||
stat.setNumInserts(insertRecordsWritten);
|
||||
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
|
||||
stat.setFileId(status.getFileId());
|
||||
stat.setFileId(writeStatus.getFileId());
|
||||
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
|
||||
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
|
||||
stat.setTotalWriteErrors(status.getFailedRecords().size());
|
||||
stat.setTotalWriteErrors(writeStatus.getFailedRecords().size());
|
||||
RuntimeStats runtimeStats = new RuntimeStats();
|
||||
runtimeStats.setTotalCreateTime(timer.endTimer());
|
||||
stat.setRuntimeStats(runtimeStats);
|
||||
status.setStat(stat);
|
||||
writeStatus.setStat(stat);
|
||||
|
||||
return status;
|
||||
return writeStatus;
|
||||
} catch (IOException e) {
|
||||
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.common.util.HoodieTimer;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
@@ -47,6 +48,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||
protected final Schema schema;
|
||||
protected HoodieTimeline hoodieTimeline;
|
||||
protected HoodieTimer timer;
|
||||
protected final WriteStatus writeStatus;
|
||||
|
||||
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
|
||||
this.commitTime = commitTime;
|
||||
@@ -56,6 +58,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||
this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
|
||||
this.schema = createHoodieWriteSchema(config);
|
||||
this.timer = new HoodieTimer().startTimer();
|
||||
this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -125,6 +128,20 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||
// NO_OP
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the actual writing of the given record into the backing file.
|
||||
*/
|
||||
public void write(HoodieRecord record, Optional<IndexedRecord> avroRecord, Optional<Exception> exception) {
|
||||
Optional recordMetadata = record.getData().getMetadata();
|
||||
if (exception.isPresent() && exception.get() instanceof Throwable) {
|
||||
// Not throwing exception from here, since we don't want to fail the entire job for a single record
|
||||
writeStatus.markFailure(record, exception.get(), recordMetadata);
|
||||
logger.error("Error writing record " + record, exception.get());
|
||||
} else {
|
||||
write(record, avroRecord);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract WriteStatus close();
|
||||
|
||||
public abstract WriteStatus getWriteStatus();
|
||||
|
||||
@@ -28,7 +28,6 @@ import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.util.DefaultSizeEstimator;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
@@ -54,7 +53,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
||||
|
||||
private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
|
||||
|
||||
private WriteStatus writeStatus;
|
||||
private Map<String, HoodieRecord<T>> keyToNewRecords;
|
||||
private Set<String> writtenRecordKeys;
|
||||
private HoodieStorageWriter<IndexedRecord> storageWriter;
|
||||
@@ -91,10 +89,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
||||
*/
|
||||
private void init(String fileId, String partitionPath, Optional<HoodieDataFile> dataFileToBeMerged) {
|
||||
this.writtenRecordKeys = new HashSet<>();
|
||||
|
||||
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
this.writeStatus = writeStatus;
|
||||
try {
|
||||
//TODO: dataFileToBeMerged must be optional. Will be fixed by Nishith's changes to support insert to log-files
|
||||
String latestValidFilePath = dataFileToBeMerged.get().getFileName();
|
||||
|
||||
Reference in New Issue
Block a user