1
0

Fixing source schema and writer schema distinction in payloads

This commit is contained in:
Nishith Agarwal
2019-03-22 16:27:51 -07:00
committed by Balaji Varadarajan
parent 395806fc68
commit 3d9041e216
10 changed files with 114 additions and 30 deletions

View File

@@ -95,7 +95,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
BoundedInMemoryExecutor<HoodieRecord<T>,
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig);
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr,
getInsertHandler(), getTransformFunction(schema));

View File

@@ -149,9 +149,10 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
Optional recordMetadata = hoodieRecord.getData().getMetadata();
try {
Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(schema);
Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(originalSchema);
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
avroRecord = Optional.of(rewriteRecord((GenericRecord) avroRecord.get()));
String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(),
recordIndex.getAndIncrement());
HoodieAvroUtils
@@ -183,8 +184,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
return Optional.empty();
}
// TODO (NA) - Perform a schema check of current input record with the last schema on log file
// to make sure we don't append records with older (shorter) schema than already appended
// TODO (NA) - Perform a writerSchema check of current input record with the last writerSchema on log file
// to make sure we don't append records with older (shorter) writerSchema than already appended
public void doAppend() {
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
@@ -199,7 +200,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
recordList.clear();
@@ -304,4 +305,4 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
}
}
}

View File

@@ -32,6 +32,7 @@ import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -49,6 +50,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
private Iterator<HoodieRecord<T>> recordIterator;
private boolean useWriterSchema;
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
@@ -68,7 +70,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
this.storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
} catch (IOException e) {
throw new HoodieInsertException(
"Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
@@ -80,6 +82,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) {
this(config, commitTime, hoodieTable, partitionPath, fileId);
this.recordIterator = recordIterator;
this.useWriterSchema = true;
}
@Override
@@ -94,7 +97,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
Optional recordMetadata = record.getData().getMetadata();
try {
if (avroRecord.isPresent()) {
storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
// update the new location of record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId()));
recordsWritten++;
@@ -122,7 +127,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
try {
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
write(record, record.getData().getInsertValue(schema));
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchema));
} else {
write(record, record.getData().getInsertValue(originalSchema));
}
}
} catch (IOException io) {
throw new HoodieInsertException(
@@ -170,4 +179,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
// Use tempPath for storage writer if possible
return (this.tempPath == null) ? this.path : this.tempPath;
}
}
}

View File

@@ -31,6 +31,7 @@ import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +46,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected final HoodieWriteConfig config;
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
protected final Schema schema;
protected final Schema originalSchema;
protected final Schema writerSchema;
protected HoodieTimeline hoodieTimeline;
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
@@ -56,7 +58,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
this.fs = hoodieTable.getMetaClient().getFs();
this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
this.schema = createHoodieWriteSchema(config);
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
}
@@ -83,8 +86,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
}
}
public static Schema createHoodieWriteSchema(HoodieWriteConfig config) {
return HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}
public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
@@ -107,8 +110,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
taskAttemptId));
}
public Schema getSchema() {
return schema;
public Schema getWriterSchema() {
return writerSchema;
}
/**
@@ -142,7 +145,16 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
}
}
/**
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields
* @param record
* @return
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchema);
}
public abstract WriteStatus close();
public abstract WriteStatus getWriteStatus();
}
}

View File

@@ -64,6 +64,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
private long insertRecordsWritten = 0;
private boolean useWriterSchema;
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
@@ -80,6 +81,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
.getPartitionPath(), dataFileToBeMerged);
}
@@ -125,7 +127,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
} catch (IOException io) {
logger.error("Error in update task at commit " + commitTime, io);
writeStatus.setGlobalError(io);
@@ -143,7 +145,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
// Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
@@ -177,7 +179,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
Optional recordMetadata = hoodieRecord.getData().getMetadata();
try {
if (indexedRecord.isPresent()) {
storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
recordsWritten++;
} else {
recordsDeleted++;
@@ -209,7 +213,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
Optional<IndexedRecord> combinedAvroRecord = hoodieRecord.getData()
.combineAndGetUpdateValue(oldRecord, schema);
.combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/* ONLY WHEN
* 1) we have an update for this key AND
@@ -235,7 +239,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file "
+ getOldFilePath() + " to file " + getStorageWriterPath() + " with schema " + schema
+ getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema
.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
@@ -254,7 +258,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
for (String key : keyToNewRecords.keySet()) {
if (!writtenRecordKeys.contains(key)) {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
if (useWriterSchema) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(originalSchema));
}
insertRecordsWritten++;
}
}
@@ -293,4 +301,4 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
public WriteStatus getWriteStatus() {
return writeStatus;
}
}
}

View File

@@ -199,7 +199,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId);
} else {
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try (ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath())
.withConf(getHadoopConf()).build()) {

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.common;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
@@ -105,6 +106,14 @@ public class HoodieTestDataGenerator {
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
return new HoodieAvroPayload(Optional.of(rec));
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
@@ -207,6 +216,33 @@ public class HoodieTestDataGenerator {
return copy;
}
public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String commitTime, int limit) throws
IOException {
List<HoodieRecord> inserts = new ArrayList<>();
for (int i = 0; i < limit; i++) {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
HoodieRecord record = new HoodieRecord(key, generateAvroPayload(key, commitTime));
inserts.add(record);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeysList.add(kp);
}
return inserts;
}
public List<HoodieRecord> generateUpdatesWithHoodieAvroPayload(String commitTime, List<HoodieRecord> baseRecords)
throws IOException {
List<HoodieRecord> updates = new ArrayList<>();
for (HoodieRecord baseRecord : baseRecords) {
HoodieRecord record = new HoodieRecord(baseRecord.getKey(), generateAvroPayload(baseRecord.getKey(), commitTime));
updates.add(record);
}
return updates;
}
public List<HoodieRecord> generateDeletes(String commitTime, Integer n) throws IOException {
List<HoodieRecord> inserts = generateInserts(commitTime, n);
return generateDeletesFromExistingRecords(inserts);

View File

@@ -482,6 +482,26 @@ public class TestCopyOnWriteTable {
assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
}
@Test
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
String commitTime = "000";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
Iterator<List<WriteStatus>> ws = table.handleInsert(commitTime, inserts.iterator());
WriteStatus writeStatus = ws.next().get(0);
String fileId = writeStatus.getFileId();
metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
table = new HoodieCopyOnWriteTable(config, jsc);
// Perform update of 100 records to test MergeHandle and BufferedExecutor
table.handleUpdate("001", fileId,
dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()).iterator());
}
@After
public void cleanup() {
if (basePath != null) {

View File

@@ -62,7 +62,6 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
if (recordBytes.length == 0) {
return Optional.empty();
}
Optional<GenericRecord> record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
}

View File

@@ -66,7 +66,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements
@Override
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return Optional.of(HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.bytesToAvro(recordBytes, Schema.parse(schemaStr)),
schema));
return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
}