Adding another metric to HoodieWriteStat to determine if there were inserts converted to updates, added one test for this
This commit is contained in:
committed by
vinoth chandar
parent
989afddd54
commit
88274b8261
@@ -48,6 +48,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
private final Path path;
|
private final Path path;
|
||||||
private Path tempPath = null;
|
private Path tempPath = null;
|
||||||
private long recordsWritten = 0;
|
private long recordsWritten = 0;
|
||||||
|
private long insertRecordsWritten = 0;
|
||||||
private long recordsDeleted = 0;
|
private long recordsDeleted = 0;
|
||||||
private Iterator<HoodieRecord<T>> recordIterator;
|
private Iterator<HoodieRecord<T>> recordIterator;
|
||||||
|
|
||||||
@@ -100,6 +101,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
// update the new location of record, so we know where to find it next
|
// update the new location of record, so we know where to find it next
|
||||||
record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId()));
|
record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId()));
|
||||||
recordsWritten++;
|
recordsWritten++;
|
||||||
|
insertRecordsWritten++;
|
||||||
} else {
|
} else {
|
||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
}
|
}
|
||||||
@@ -149,6 +151,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
HoodieWriteStat stat = new HoodieWriteStat();
|
HoodieWriteStat stat = new HoodieWriteStat();
|
||||||
stat.setNumWrites(recordsWritten);
|
stat.setNumWrites(recordsWritten);
|
||||||
stat.setNumDeletes(recordsDeleted);
|
stat.setNumDeletes(recordsDeleted);
|
||||||
|
stat.setNumInserts(insertRecordsWritten);
|
||||||
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
|
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
|
||||||
stat.setFileId(status.getFileId());
|
stat.setFileId(status.getFileId());
|
||||||
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
|
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
|
||||||
|
|||||||
@@ -67,6 +67,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
private long recordsWritten = 0;
|
private long recordsWritten = 0;
|
||||||
private long recordsDeleted = 0;
|
private long recordsDeleted = 0;
|
||||||
private long updatedRecordsWritten = 0;
|
private long updatedRecordsWritten = 0;
|
||||||
|
private long insertRecordsWritten = 0;
|
||||||
|
|
||||||
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
|
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
|
||||||
Iterator<HoodieRecord<T>> recordItr, String fileId) {
|
Iterator<HoodieRecord<T>> recordItr, String fileId) {
|
||||||
@@ -173,14 +174,19 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
return partitionPath;
|
return partitionPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
|
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Optional<IndexedRecord> indexedRecord) {
|
||||||
Optional<IndexedRecord> indexedRecord) {
|
if (indexedRecord.isPresent()) {
|
||||||
|
updatedRecordsWritten++;
|
||||||
|
}
|
||||||
|
return writeRecord(hoodieRecord, indexedRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean writeRecord(HoodieRecord<T> hoodieRecord, Optional<IndexedRecord> indexedRecord) {
|
||||||
Optional recordMetadata = hoodieRecord.getData().getMetadata();
|
Optional recordMetadata = hoodieRecord.getData().getMetadata();
|
||||||
try {
|
try {
|
||||||
if (indexedRecord.isPresent()) {
|
if (indexedRecord.isPresent()) {
|
||||||
storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
|
storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
|
||||||
recordsWritten++;
|
recordsWritten++;
|
||||||
updatedRecordsWritten++;
|
|
||||||
} else {
|
} else {
|
||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
}
|
}
|
||||||
@@ -256,7 +262,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
String key = pendingRecordsItr.next();
|
String key = pendingRecordsItr.next();
|
||||||
if (!writtenRecordKeys.contains(key)) {
|
if (!writtenRecordKeys.contains(key)) {
|
||||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||||
writeUpdateRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
|
||||||
|
insertRecordsWritten++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keyToNewRecords.clear();
|
keyToNewRecords.clear();
|
||||||
@@ -270,6 +277,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
writeStatus.getStat().setNumWrites(recordsWritten);
|
writeStatus.getStat().setNumWrites(recordsWritten);
|
||||||
writeStatus.getStat().setNumDeletes(recordsDeleted);
|
writeStatus.getStat().setNumDeletes(recordsDeleted);
|
||||||
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
|
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
|
||||||
|
writeStatus.getStat().setNumInserts(insertRecordsWritten);
|
||||||
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
|
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
|
||||||
RuntimeStats runtimeStats = new RuntimeStats();
|
RuntimeStats runtimeStats = new RuntimeStats();
|
||||||
runtimeStats.setTotalUpsertTime(timer.endTimer());
|
runtimeStats.setTotalUpsertTime(timer.endTimer());
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils;
|
|||||||
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||||
|
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||||
@@ -35,6 +36,7 @@ import com.uber.hoodie.config.HoodieIndexConfig;
|
|||||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
import com.uber.hoodie.index.HoodieIndex;
|
import com.uber.hoodie.index.HoodieIndex;
|
||||||
|
import com.uber.hoodie.table.HoodieTable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -46,12 +48,13 @@ import org.apache.spark.sql.Dataset;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestHoodieMergeHandleDuplicateRecords {
|
public class TestHoodieMergeHandle {
|
||||||
|
|
||||||
protected transient JavaSparkContext jsc = null;
|
protected transient JavaSparkContext jsc = null;
|
||||||
protected transient SQLContext sqlContext;
|
protected transient SQLContext sqlContext;
|
||||||
@@ -62,7 +65,7 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
// Initialize a local spark env
|
// Initialize a local spark env
|
||||||
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandleDuplicateRecords"));
|
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandle"));
|
||||||
|
|
||||||
//SQLContext stuff
|
//SQLContext stuff
|
||||||
sqlContext = new SQLContext(jsc);
|
sqlContext = new SQLContext(jsc);
|
||||||
@@ -241,6 +244,82 @@ public class TestHoodieMergeHandleDuplicateRecords {
|
|||||||
assertEquals(21, record2Count);
|
assertEquals(21, record2Count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
|
||||||
|
// insert 100 records
|
||||||
|
HoodieWriteConfig config = getConfigBuilder().build();
|
||||||
|
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||||
|
String newCommitTime = "100";
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
|
||||||
|
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
|
||||||
|
List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
|
// All records should be inserts into new parquet
|
||||||
|
Assert.assertTrue(statuses.stream()
|
||||||
|
.filter(status -> status.getStat().getPrevCommit() != HoodieWriteStat.NULL_COMMIT).count() > 0);
|
||||||
|
// Num writes should be equal to the number of records inserted
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
// Num update writes should be equal to the number of records updated
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 0);
|
||||||
|
// Num update writes should be equal to the number of insert records converted to updates as part of small file
|
||||||
|
// handling
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
|
||||||
|
// Update all the 100 records
|
||||||
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||||
|
|
||||||
|
newCommitTime = "101";
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
|
||||||
|
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
|
||||||
|
statuses = writeClient.upsert(updatedRecordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
|
// All records should be upserts into existing parquet
|
||||||
|
Assert.assertEquals(statuses.stream()
|
||||||
|
.filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0);
|
||||||
|
// Num writes should be equal to the number of records inserted
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
// Num update writes should be equal to the number of records updated
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
// Num update writes should be equal to the number of insert records converted to updates as part of small file
|
||||||
|
// handling
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 0);
|
||||||
|
|
||||||
|
|
||||||
|
newCommitTime = "102";
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> allRecords = dataGen.generateInserts(newCommitTime, 100);
|
||||||
|
allRecords.addAll(updatedRecords);
|
||||||
|
JavaRDD<HoodieRecord> allRecordsRDD = jsc.parallelize(allRecords, 1);
|
||||||
|
statuses = writeClient.upsert(allRecordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
|
// All records should be upserts into existing parquet (with inserts as updates small file handled)
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0);
|
||||||
|
// Num writes should be equal to the total number of records written
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 200);
|
||||||
|
// Num update writes should be equal to the number of records updated (including inserts converted as updates)
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
// Num update writes should be equal to the number of insert records converted to updates as part of small file
|
||||||
|
// handling
|
||||||
|
Assert.assertEquals((long) statuses.stream()
|
||||||
|
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private Dataset<Row> getRecords() {
|
private Dataset<Row> getRecords() {
|
||||||
// Check the entire dataset has 8 records still
|
// Check the entire dataset has 8 records still
|
||||||
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
@@ -827,8 +827,8 @@ public class TestMergeOnReadTable {
|
|||||||
writeClient.commit(newCommitTime, statuses);
|
writeClient.commit(newCommitTime, statuses);
|
||||||
|
|
||||||
// rollback a successful commit
|
// rollback a successful commit
|
||||||
// Sleep for small interval to force a new rollback start time.
|
// Sleep for small interval (at least 1 second) to force a new rollback start time.
|
||||||
Thread.sleep(5);
|
Thread.sleep(1000);
|
||||||
writeClient.rollback(newCommitTime);
|
writeClient.rollback(newCommitTime);
|
||||||
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||||
|
|||||||
@@ -61,6 +61,11 @@
|
|||||||
"name":"totalUpdatedRecordsCompacted",
|
"name":"totalUpdatedRecordsCompacted",
|
||||||
"type":["null","long"],
|
"type":["null","long"],
|
||||||
"default" : null
|
"default" : null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"numInserts",
|
||||||
|
"type":["null","long"],
|
||||||
|
"default" : null
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,6 +61,11 @@ public class HoodieWriteStat implements Serializable {
|
|||||||
*/
|
*/
|
||||||
private long numUpdateWrites;
|
private long numUpdateWrites;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total number of insert records or converted to updates (for small file handling)
|
||||||
|
*/
|
||||||
|
private long numInserts;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Total size of file written
|
* Total size of file written
|
||||||
*/
|
*/
|
||||||
@@ -160,6 +165,10 @@ public class HoodieWriteStat implements Serializable {
|
|||||||
this.numUpdateWrites = numUpdateWrites;
|
this.numUpdateWrites = numUpdateWrites;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setNumInserts(long numInserts) {
|
||||||
|
this.numInserts = numInserts;
|
||||||
|
}
|
||||||
|
|
||||||
public long getTotalWriteBytes() {
|
public long getTotalWriteBytes() {
|
||||||
return totalWriteBytes;
|
return totalWriteBytes;
|
||||||
}
|
}
|
||||||
@@ -192,6 +201,10 @@ public class HoodieWriteStat implements Serializable {
|
|||||||
return numUpdateWrites;
|
return numUpdateWrites;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getNumInserts() {
|
||||||
|
return numInserts;
|
||||||
|
}
|
||||||
|
|
||||||
public String getFileId() {
|
public String getFileId() {
|
||||||
return fileId;
|
return fileId;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user