diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index a0b232105..0781faf36 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -48,6 +48,7 @@ public class HoodieCreateHandle extends HoodieIOH private final Path path; private Path tempPath = null; private long recordsWritten = 0; + private long insertRecordsWritten = 0; private long recordsDeleted = 0; private Iterator> recordIterator; @@ -100,6 +101,7 @@ public class HoodieCreateHandle extends HoodieIOH // update the new location of record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId())); recordsWritten++; + insertRecordsWritten++; } else { recordsDeleted++; } @@ -149,6 +151,7 @@ public class HoodieCreateHandle extends HoodieIOH HoodieWriteStat stat = new HoodieWriteStat(); stat.setNumWrites(recordsWritten); stat.setNumDeletes(recordsDeleted); + stat.setNumInserts(insertRecordsWritten); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(status.getFileId()); stat.setPaths(new Path(config.getBasePath()), path, tempPath); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 2bd5d8668..f11fce57c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -67,6 +67,7 @@ public class HoodieMergeHandle extends HoodieIOHa private long recordsWritten = 0; private long recordsDeleted = 0; private long updatedRecordsWritten = 0; + private long insertRecordsWritten = 0; public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, Iterator> recordItr, String fileId) { @@ -173,14 +174,19 @@ public class HoodieMergeHandle extends HoodieIOHa return partitionPath; } - private boolean writeUpdateRecord(HoodieRecord hoodieRecord, - Optional indexedRecord) { + private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Optional indexedRecord) { + if (indexedRecord.isPresent()) { + updatedRecordsWritten++; + } + return writeRecord(hoodieRecord, indexedRecord); + } + + private boolean writeRecord(HoodieRecord hoodieRecord, Optional indexedRecord) { Optional recordMetadata = hoodieRecord.getData().getMetadata(); try { if (indexedRecord.isPresent()) { storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord); recordsWritten++; - updatedRecordsWritten++; } else { recordsDeleted++; } @@ -256,7 +262,8 @@ public class HoodieMergeHandle extends HoodieIOHa String key = pendingRecordsItr.next(); if (!writtenRecordKeys.contains(key)) { HoodieRecord hoodieRecord = keyToNewRecords.get(key); - writeUpdateRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema)); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema)); + insertRecordsWritten++; } } keyToNewRecords.clear(); @@ -270,6 +277,7 @@ public class HoodieMergeHandle extends HoodieIOHa writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); + writeStatus.getStat().setNumInserts(insertRecordsWritten); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java similarity index 72% rename from hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java rename to hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java index e5523f8be..d28750108 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandleDuplicateRecords.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java @@ -26,6 +26,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; @@ -35,6 +36,7 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -46,12 +48,13 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; @SuppressWarnings("unchecked") -public class TestHoodieMergeHandleDuplicateRecords { +public class TestHoodieMergeHandle { protected transient JavaSparkContext jsc = null; protected transient SQLContext sqlContext; @@ -62,7 +65,7 @@ public class TestHoodieMergeHandleDuplicateRecords { @Before public void init() throws IOException { // Initialize a local spark env - jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandleDuplicateRecords")); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandle")); //SQLContext stuff sqlContext = new SQLContext(jsc); @@ -241,6 +244,82 @@ public class TestHoodieMergeHandleDuplicateRecords { assertEquals(21, record2Count); } + @Test + public void testHoodieMergeHandleWriteStatMetrics() throws Exception { + // insert 100 records + HoodieWriteConfig config = getConfigBuilder().build(); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); + + // All records should be inserts into new parquet + Assert.assertTrue(statuses.stream() + .filter(status -> status.getStat().getPrevCommit() != HoodieWriteStat.NULL_COMMIT).count() > 0); + // Num writes should be equal to the number of records inserted + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100); + // Num update writes should be equal to the number of records updated + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 0); + // Num update writes should be equal to the number of insert records converted to updates as part of small file + // handling + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); + + // Update all the 100 records + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + + newCommitTime = "101"; + writeClient.startCommitWithTime(newCommitTime); + + List updatedRecords = dataGen.generateUpdates(newCommitTime, records); + JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + statuses = writeClient.upsert(updatedRecordsRDD, newCommitTime).collect(); + + // All records should be upserts into existing parquet + Assert.assertEquals(statuses.stream() + .filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0); + // Num writes should be equal to the number of records inserted + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100); + // Num update writes should be equal to the number of records updated + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100); + // Num update writes should be equal to the number of insert records converted to updates as part of small file + // handling + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 0); + + + newCommitTime = "102"; + writeClient.startCommitWithTime(newCommitTime); + + List allRecords = dataGen.generateInserts(newCommitTime, 100); + allRecords.addAll(updatedRecords); + JavaRDD allRecordsRDD = jsc.parallelize(allRecords, 1); + statuses = writeClient.upsert(allRecordsRDD, newCommitTime).collect(); + + // All records should be upserts into existing parquet (with inserts as updates small file handled) + Assert.assertEquals((long) statuses.stream() + .filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0); + // Num writes should be equal to the total number of records written + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 200); + // Num update writes should be equal to the number of records updated (including inserts converted as updates) + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100); + // Num update writes should be equal to the number of insert records converted to updates as part of small file + // handling + Assert.assertEquals((long) statuses.stream() + .map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); + + } + private Dataset getRecords() { // Check the entire dataset has 8 records still String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index fbcdac258..f5c22c442 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -827,8 +827,8 @@ public class TestMergeOnReadTable { writeClient.commit(newCommitTime, statuses); // rollback a successful commit - // Sleep for small interval to force a new rollback start time. - Thread.sleep(5); + // Sleep for small interval (at least 1 second) to force a new rollback start time. + Thread.sleep(1000); writeClient.rollback(newCommitTime); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); diff --git a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc index 3d41765ff..0429cc7d3 100644 --- a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc @@ -61,6 +61,11 @@ "name":"totalUpdatedRecordsCompacted", "type":["null","long"], "default" : null + }, + { + "name":"numInserts", + "type":["null","long"], + "default" : null } ] } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index e03e84e3a..752368481 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -61,6 +61,11 @@ public class HoodieWriteStat implements Serializable { */ private long numUpdateWrites; + /** + * Total number of insert records or converted to updates (for small file handling) + */ + private long numInserts; + /** * Total size of file written */ @@ -160,6 +165,10 @@ public class HoodieWriteStat implements Serializable { this.numUpdateWrites = numUpdateWrites; } + public void setNumInserts(long numInserts) { + this.numInserts = numInserts; + } + public long getTotalWriteBytes() { return totalWriteBytes; } @@ -192,6 +201,10 @@ public class HoodieWriteStat implements Serializable { return numUpdateWrites; } + public long getNumInserts() { + return numInserts; + } + public String getFileId() { return fileId; }