diff --git a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java index 928979c24..302a0fcd2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java @@ -24,6 +24,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * Status of a write operation. @@ -47,16 +49,34 @@ public class WriteStatus implements Serializable { private long totalRecords = 0; private long totalErrorRecords = 0; - public void markSuccess(HoodieRecord record) { - writtenRecords.add(record); - totalRecords++; + /** + * Mark write as success, optionally using given parameters for the purpose of calculating + * some aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus + * objects are collected in Spark Driver. + * + * @param record deflated {@code HoodieRecord} containing information that uniquely identifies it. + * @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation. + */ + public void markSuccess(HoodieRecord record, + Optional> optionalRecordMetadata) { + writtenRecords.add(record); + totalRecords++; } - public void markFailure(HoodieRecord record, Throwable t) { - failedRecords.add(record); - errors.put(record.getKey(), t); - totalRecords++; - totalErrorRecords++; + /** + * Mark write as failed, optionally using given parameters for the purpose of calculating + * some aggregate metrics. This method is not meant to cache passed arguments, since WriteStatus + * objects are collected in Spark Driver. + * + * @param record deflated {@code HoodieRecord} containing information that uniquely identifies it. + * @param optionalRecordMetadata optional metadata related to data contained in {@link HoodieRecord} before deflation. + */ + public void markFailure(HoodieRecord record, Throwable t, + Optional> optionalRecordMetadata) { + failedRecords.add(record); + errors.put(record.getKey(), t); + totalRecords++; + totalErrorRecords++; } public String getFileId() { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 26704dcaa..f3081e21f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -18,6 +18,7 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; +import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.index.HoodieIndex; @@ -54,6 +55,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; + private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; + private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); private HoodieWriteConfig(Properties props) { super(props); @@ -106,6 +109,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL)); } + public String getWriteStatusClassName() { + return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); + } + /** * compaction properties **/ @@ -363,6 +370,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withWriteStatusClass(Class writeStatusClass) { + props.setProperty(HOODIE_WRITE_STATUS_CLASS_PROP, writeStatusClass.getName()); + return this; + } + public HoodieWriteConfig build() { HoodieWriteConfig config = new HoodieWriteConfig(props); // Check for mandatory properties @@ -383,6 +395,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT); setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP), HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); + setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), + HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 914ff6f48..3fdcd77f0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -29,13 +29,13 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieAppendException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,10 +45,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; /** * IO Operation to append data onto an existing file. @@ -74,7 +72,7 @@ public class HoodieAppendHandle extends HoodieIOH String fileId, Iterator> recordItr) { super(config, commitTime, hoodieTable); - WriteStatus writeStatus = new WriteStatus(); + WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieDeltaWriteStat()); this.writeStatus = writeStatus; this.fileId = fileId; @@ -128,6 +126,7 @@ public class HoodieAppendHandle extends HoodieIOH } private Optional getIndexedRecord(HoodieRecord hoodieRecord) { + Optional recordMetadata = hoodieRecord.getData().getMetadata(); try { Optional avroRecord = hoodieRecord.getData().getInsertValue(schema); @@ -145,11 +144,11 @@ public class HoodieAppendHandle extends HoodieIOH } hoodieRecord.deflate(); - writeStatus.markSuccess(hoodieRecord); + writeStatus.markSuccess(hoodieRecord, recordMetadata); return avroRecord; } catch (Exception e) { logger.error("Error writing record " + hoodieRecord, e); - writeStatus.markFailure(hoodieRecord, e); + writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return Optional.empty(); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index a5ecad7cb..c9680c8f2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.io.storage.HoodieStorageWriter; @@ -50,7 +51,7 @@ public class HoodieCreateHandle extends HoodieIOH public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath) { super(config, commitTime, hoodieTable); - this.status = new WriteStatus(); + this.status = ReflectionUtils.loadClass(config.getWriteStatusClassName()); status.setFileId(UUID.randomUUID().toString()); status.setPartitionPath(partitionPath); @@ -89,6 +90,7 @@ public class HoodieCreateHandle extends HoodieIOH * @param record */ public void write(HoodieRecord record) { + Optional recordMetadata = record.getData().getMetadata(); try { Optional avroRecord = record.getData().getInsertValue(schema); @@ -100,13 +102,12 @@ public class HoodieCreateHandle extends HoodieIOH } else { recordsDeleted++; } - record.deflate(); - status.markSuccess(record); + status.markSuccess(record, recordMetadata); } catch (Throwable t) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record - status.markFailure(record, t); + status.markFailure(record, t, recordMetadata); logger.error("Error writing record " + record, t); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index effcff0e8..262da60ca 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -17,6 +17,7 @@ package com.uber.hoodie.io; import com.uber.hoodie.common.model.HoodiePartitionMetadata; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; @@ -66,7 +67,7 @@ public class HoodieMergeHandle extends HoodieIOHa * Load the new incoming records in a map, and extract the old file path. */ private void init(String fileId, Iterator> newRecordsItr) { - WriteStatus writeStatus = new WriteStatus(); + WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieWriteStat()); this.writeStatus = writeStatus; this.keyToNewRecords = new HashMap<>(); @@ -129,6 +130,7 @@ public class HoodieMergeHandle extends HoodieIOHa private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Optional indexedRecord) { + Optional recordMetadata = hoodieRecord.getData().getMetadata(); try { if(indexedRecord.isPresent()) { storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord); @@ -139,11 +141,11 @@ public class HoodieMergeHandle extends HoodieIOHa } hoodieRecord.deflate(); - writeStatus.markSuccess(hoodieRecord); + writeStatus.markSuccess(hoodieRecord, recordMetadata); return true; } catch (Exception e) { logger.error("Error writing record "+ hoodieRecord, e); - writeStatus.markFailure(hoodieRecord, e); + writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return false; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index 1c86bed08..831ec626e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -19,6 +19,7 @@ package com.uber.hoodie; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieMergeOnReadTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus; import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; @@ -224,7 +225,26 @@ public class TestMergeOnReadTable { assertEquals("Must contain 200 records", 200, readClient.readSince("000").count()); } + // Check if record level metadata is aggregated properly at the end of write. @Test + public void testMetadataAggregateFromWriteStatus() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + String newCommitTime = "001"; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus .mergeMetadataForWriteStatuses(statuses); + assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000")); + // For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this should be 2 * records.size() + assertEquals(String.valueOf(2 * records.size()), allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000")); + } + + @Test public void testSimpleInsertAndDelete() throws Exception { HoodieWriteConfig cfg = getConfig(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java index a672cb31b..95a47d904 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java @@ -18,9 +18,14 @@ package com.uber.hoodie.common; import com.fasterxml.jackson.databind.ObjectMapper; +import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.MercifulJsonConverter; +import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.io.IOUtils; @@ -91,6 +96,15 @@ public class TestRawTripPayload implements HoodieRecordPayload> getMetadata() { + // Let's assume we want to count the number of input row change events + // that are processed. Let the time-bucket for this row change event be 1506582000. + Map metadataMap = new HashMap<>(); + metadataMap.put("InputRecordCount_1506582000", "2"); + return Optional.of(metadataMap); + } + public String getRowKey() { return rowKey; } @@ -120,4 +134,58 @@ public class TestRawTripPayload implements HoodieRecordPayload mergedMetadataMap = new HashMap<>(); + + @Override + public void markSuccess(HoodieRecord record, Optional> recordMetadata) { + super.markSuccess(record, recordMetadata); + if(recordMetadata.isPresent()) { + mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap); + } + } + + @Override + public void markFailure(HoodieRecord record, Throwable t, + Optional> recordMetadata) { + super.markFailure(record, t, recordMetadata); + if(recordMetadata.isPresent()) { + mergeMetadataMaps(recordMetadata.get(), mergedMetadataMap); + } + } + + public static Map mergeMetadataForWriteStatuses(List writeStatuses) { + Map allWriteStatusMergedMetadataMap = new HashMap<>(); + for (WriteStatus writeStatus : writeStatuses) { + MetadataMergeWriteStatus.mergeMetadataMaps( + ((MetadataMergeWriteStatus)writeStatus).getMergedMetadataMap(), + allWriteStatusMergedMetadataMap); + } + return allWriteStatusMergedMetadataMap; + } + + private static void mergeMetadataMaps(Map mergeFromMap, Map mergeToMap) { + for (Entry entry : mergeFromMap.entrySet()) { + String key = entry.getKey(); + if(!mergeToMap.containsKey(key)) { + mergeToMap.put(key, "0"); + } + mergeToMap + .put(key, addStrsAsInt(entry.getValue(), mergeToMap.get(key))); + } + } + + private Map getMergedMetadataMap() { + return mergedMetadataMap; + } + + private static String addStrsAsInt(String a, String b) { + return String.valueOf(Integer.parseInt(a) + Integer.parseInt(b)); + } + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index f01c4c801..020166d5a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -16,6 +16,7 @@ package com.uber.hoodie.table; +import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.config.HoodieWriteConfig; @@ -34,13 +35,13 @@ import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.config.HoodieStorageConfig; +import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.junit.After; import org.junit.Before; @@ -238,7 +239,41 @@ public class TestCopyOnWriteTable { return records; } - @Test public void testInsertWithPartialFailures() throws Exception { + // Check if record level metadata is aggregated properly at the end of write. + @Test + public void testMetadataAggregateFromWriteStatus() throws Exception { + // Prepare the AvroParquetIO + HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); + + // Get some records belong to the same partition (2016/01/31) + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + + List records = new ArrayList<>(); + TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); + records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); + TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); + records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); + TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); + records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); + + // Insert new records + List writeStatuses = HoodieClientTestUtils + .collectStatuses(table.handleInsert(firstCommitTime, records.iterator())); + Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus + .mergeMetadataForWriteStatuses(writeStatuses); + assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000")); + // For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this should be 2 * 3 + assertEquals("6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000")); + } + + @Test + public void testInsertWithPartialFailures() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String commitTime = HoodieTestUtils.makeNewCommitTime(); FileSystem fs = FSUtils.getFs(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java index 1e0494dc8..c2ca79343 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.model; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -56,4 +57,13 @@ public interface HoodieRecordPayload extends Seri * Return EMPTY to skip writing this record. */ Optional getInsertValue(Schema schema) throws IOException; + + /** + * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed + * to {@code WriteStatus.markSuccess()} and {@code WriteStatus.markFailure()} in order to compute + * some aggregate metrics using the metadata in the context of a write success or failure. + */ + default Optional> getMetadata() { + return Optional.empty(); + } }