diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java index 657c7869d..30b353bf9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java @@ -95,7 +95,7 @@ public class CopyOnWriteLazyInsertIterable extend BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; try { - final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig); + final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 69e36ee6b..7cd22676f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -149,9 +149,10 @@ public class HoodieAppendHandle extends HoodieIOH private Optional getIndexedRecord(HoodieRecord hoodieRecord) { Optional recordMetadata = hoodieRecord.getData().getMetadata(); try { - Optional avroRecord = hoodieRecord.getData().getInsertValue(schema); - + Optional avroRecord = hoodieRecord.getData().getInsertValue(originalSchema); if (avroRecord.isPresent()) { + // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema + avroRecord = Optional.of(rewriteRecord((GenericRecord) avroRecord.get())); String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement()); HoodieAvroUtils @@ -183,8 +184,8 @@ public class HoodieAppendHandle extends HoodieIOH return Optional.empty(); } - // TODO (NA) - Perform a schema check of current input record with the last schema on log file - // to make sure we don't append records with older (shorter) schema than already appended + // TODO (NA) - Perform a writerSchema check of current input record with the last writerSchema on log file + // to make sure we don't append records with older (shorter) writerSchema than already appended public void doAppend() { while (recordItr.hasNext()) { HoodieRecord record = recordItr.next(); @@ -199,7 +200,7 @@ public class HoodieAppendHandle extends HoodieIOH private void doAppend(Map header) { try { header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString()); if (recordList.size() > 0) { writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header)); recordList.clear(); @@ -304,4 +305,4 @@ public class HoodieAppendHandle extends HoodieIOH } } -} +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index c56d30c7e..c677ad388 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -32,6 +32,7 @@ import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.util.Iterator; import java.util.Optional; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -49,6 +50,7 @@ public class HoodieCreateHandle extends HoodieIOH private long insertRecordsWritten = 0; private long recordsDeleted = 0; private Iterator> recordIterator; + private boolean useWriterSchema; public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileId) { @@ -68,7 +70,7 @@ public class HoodieCreateHandle extends HoodieIOH new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(TaskContext.getPartitionId()); this.storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema); } catch (IOException e) { throw new HoodieInsertException( "Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e); @@ -80,6 +82,7 @@ public class HoodieCreateHandle extends HoodieIOH String partitionPath, String fileId, Iterator> recordIterator) { this(config, commitTime, hoodieTable, partitionPath, fileId); this.recordIterator = recordIterator; + this.useWriterSchema = true; } @Override @@ -94,7 +97,9 @@ public class HoodieCreateHandle extends HoodieIOH Optional recordMetadata = record.getData().getMetadata(); try { if (avroRecord.isPresent()) { - storageWriter.writeAvroWithMetadata(avroRecord.get(), record); + // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema + IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); + storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); // update the new location of record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId())); recordsWritten++; @@ -122,7 +127,11 @@ public class HoodieCreateHandle extends HoodieIOH try { while (recordIterator.hasNext()) { HoodieRecord record = recordIterator.next(); - write(record, record.getData().getInsertValue(schema)); + if (useWriterSchema) { + write(record, record.getData().getInsertValue(writerSchema)); + } else { + write(record, record.getData().getInsertValue(originalSchema)); + } } } catch (IOException io) { throw new HoodieInsertException( @@ -170,4 +179,4 @@ public class HoodieCreateHandle extends HoodieIOH // Use tempPath for storage writer if possible return (this.tempPath == null) ? this.path : this.tempPath; } -} +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index af8574fd2..b4a03d180 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -31,6 +31,7 @@ import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.util.Optional; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,7 +46,8 @@ public abstract class HoodieIOHandle { protected final HoodieWriteConfig config; protected final FileSystem fs; protected final HoodieTable hoodieTable; - protected final Schema schema; + protected final Schema originalSchema; + protected final Schema writerSchema; protected HoodieTimeline hoodieTimeline; protected HoodieTimer timer; protected final WriteStatus writeStatus; @@ -56,7 +58,8 @@ public abstract class HoodieIOHandle { this.fs = hoodieTable.getMetaClient().getFs(); this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline(); - this.schema = createHoodieWriteSchema(config); + this.originalSchema = new Schema.Parser().parse(config.getSchema()); + this.writerSchema = createHoodieWriteSchema(originalSchema); this.timer = new HoodieTimer().startTimer(); this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); } @@ -83,8 +86,8 @@ public abstract class HoodieIOHandle { } } - public static Schema createHoodieWriteSchema(HoodieWriteConfig config) { - return HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + public static Schema createHoodieWriteSchema(Schema originalSchema) { + return HoodieAvroUtils.addMetadataFields(originalSchema); } public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) { @@ -107,8 +110,8 @@ public abstract class HoodieIOHandle { taskAttemptId)); } - public Schema getSchema() { - return schema; + public Schema getWriterSchema() { + return writerSchema; } /** @@ -142,7 +145,16 @@ public abstract class HoodieIOHandle { } } + /** + * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields + * @param record + * @return + */ + protected GenericRecord rewriteRecord(GenericRecord record) { + return HoodieAvroUtils.rewriteRecord(record, writerSchema); + } + public abstract WriteStatus close(); public abstract WriteStatus getWriteStatus(); -} +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 1b12931ee..1ad8b0f51 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -64,6 +64,7 @@ public class HoodieMergeHandle extends HoodieIOHa private long recordsDeleted = 0; private long updatedRecordsWritten = 0; private long insertRecordsWritten = 0; + private boolean useWriterSchema; public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, Iterator> recordItr, String fileId) { @@ -80,6 +81,7 @@ public class HoodieMergeHandle extends HoodieIOHa super(config, commitTime, hoodieTable); this.fileSystemView = hoodieTable.getROFileSystemView(); this.keyToNewRecords = keyToNewRecords; + this.useWriterSchema = true; init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()) .getPartitionPath(), dataFileToBeMerged); } @@ -125,7 +127,7 @@ public class HoodieMergeHandle extends HoodieIOHa writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema); } catch (IOException io) { logger.error("Error in update task at commit " + commitTime, io); writeStatus.setGlobalError(io); @@ -143,7 +145,7 @@ public class HoodieMergeHandle extends HoodieIOHa // Load the new records in a map logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), - config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); + config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } @@ -177,7 +179,9 @@ public class HoodieMergeHandle extends HoodieIOHa Optional recordMetadata = hoodieRecord.getData().getMetadata(); try { if (indexedRecord.isPresent()) { - storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord); + // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema + IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get()); + storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord); recordsWritten++; } else { recordsDeleted++; @@ -209,7 +213,7 @@ public class HoodieMergeHandle extends HoodieIOHa HoodieRecord hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key)); try { Optional combinedAvroRecord = hoodieRecord.getData() - .combineAndGetUpdateValue(oldRecord, schema); + .combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema); if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) { /* ONLY WHEN * 1) we have an update for this key AND @@ -235,7 +239,7 @@ public class HoodieMergeHandle extends HoodieIOHa storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file " - + getOldFilePath() + " to file " + getStorageWriterPath() + " with schema " + schema + + getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema .toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { @@ -254,7 +258,11 @@ public class HoodieMergeHandle extends HoodieIOHa for (String key : keyToNewRecords.keySet()) { if (!writtenRecordKeys.contains(key)) { HoodieRecord hoodieRecord = keyToNewRecords.get(key); - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema)); + if (useWriterSchema) { + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema)); + } else { + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(originalSchema)); + } insertRecordsWritten++; } } @@ -293,4 +301,4 @@ public class HoodieMergeHandle extends HoodieIOHa public WriteStatus getWriteStatus() { return writeStatus; } -} +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index ea7eceb98..a1d973183 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -199,7 +199,7 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieUpsertException( "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId); } else { - AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); + AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema()); BoundedInMemoryExecutor wrapper = null; try (ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) .withConf(getHadoopConf()).build()) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index d1713c5e4..ca776fb8a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodiePartitionMetadata; @@ -105,6 +106,14 @@ public class HoodieTestDataGenerator { return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } + /** + * Generates a new avro record of the above schema format, retaining the key if optionally provided. + */ + public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commitTime) throws IOException { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return new HoodieAvroPayload(Optional.of(rec)); + } + public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, double timestamp) { GenericRecord rec = new GenericData.Record(avroSchema); @@ -207,6 +216,33 @@ public class HoodieTestDataGenerator { return copy; } + public List generateInsertsWithHoodieAvroPayload(String commitTime, int limit) throws + IOException { + List inserts = new ArrayList<>(); + for (int i = 0; i < limit; i++) { + String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + HoodieRecord record = new HoodieRecord(key, generateAvroPayload(key, commitTime)); + inserts.add(record); + + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeysList.add(kp); + } + return inserts; + } + + public List generateUpdatesWithHoodieAvroPayload(String commitTime, List baseRecords) + throws IOException { + List updates = new ArrayList<>(); + for (HoodieRecord baseRecord : baseRecords) { + HoodieRecord record = new HoodieRecord(baseRecord.getKey(), generateAvroPayload(baseRecord.getKey(), commitTime)); + updates.add(record); + } + return updates; + } + public List generateDeletes(String commitTime, Integer n) throws IOException { List inserts = generateInserts(commitTime, n); return generateDeletesFromExistingRecords(inserts); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 2536c47eb..3db10a493 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -482,6 +482,26 @@ public class TestCopyOnWriteTable { assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01); } + @Test + public void testInsertUpsertWithHoodieAvroPayload() throws Exception { + HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig( + HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + String commitTime = "000"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + // Perform inserts of 100 records to test CreateHandle and BufferedExecutor + List inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100); + Iterator> ws = table.handleInsert(commitTime, inserts.iterator()); + WriteStatus writeStatus = ws.next().get(0); + String fileId = writeStatus.getFileId(); + metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close(); + table = new HoodieCopyOnWriteTable(config, jsc); + // Perform update of 100 records to test MergeHandle and BufferedExecutor + table.handleUpdate("001", fileId, + dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()).iterator()); + } + @After public void cleanup() { if (basePath != null) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java index 692079014..0eb39a3ed 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -62,7 +62,6 @@ public class HoodieAvroPayload implements HoodieRecordPayload if (recordBytes.length == 0) { return Optional.empty(); } - Optional record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); - return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema)); + return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } } diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java index 0b454f1ef..8dac77b4e 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java @@ -66,7 +66,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements @Override public Optional getInsertValue(Schema schema) throws IOException { - return Optional.of(HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.bytesToAvro(recordBytes, Schema.parse(schemaStr)), - schema)); + return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } }