[MINOR] Removing code which is duplicated from the base class HoodieWriteHandle. (#1399)
This commit is contained in:
@@ -92,65 +92,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
|
|||||||
return HoodieAvroUtils.addMetadataFields(originalSchema);
|
return HoodieAvroUtils.addMetadataFields(originalSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Path makeNewPath(String partitionPath) {
|
|
||||||
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
|
|
||||||
try {
|
|
||||||
fs.mkdirs(path); // create a new partition as needed.
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to make dir " + path, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Schema getWriterSchema() {
|
public Schema getWriterSchema() {
|
||||||
return writerSchema;
|
return writerSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines whether we can accept the incoming records, into the current file. Depending on
|
|
||||||
* <p>
|
|
||||||
* - Whether it belongs to the same partitionPath as existing records - Whether the current file written bytes lt max
|
|
||||||
* file size
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean canWrite(HoodieRecord record) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform the actual writing of the given record into the backing file.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
|
|
||||||
// NO_OP
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform the actual writing of the given record into the backing file.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
|
|
||||||
Option recordMetadata = record.getData().getMetadata();
|
|
||||||
if (exception.isPresent() && exception.get() instanceof Throwable) {
|
|
||||||
// Not throwing exception from here, since we don't want to fail the entire job for a single record
|
|
||||||
writeStatus.markFailure(record, exception.get(), recordMetadata);
|
|
||||||
LOG.error("Error writing record " + record, exception.get());
|
|
||||||
} else {
|
|
||||||
write(record, avroRecord);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected GenericRecord rewriteRecord(GenericRecord record) {
|
|
||||||
return HoodieAvroUtils.rewriteRecord(record, writerSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract old file path, initialize StorageWriter and WriteStatus.
|
* Extract old file path, initialize StorageWriter and WriteStatus.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user