1
0

capture record metadata before deflating for record counting

This commit is contained in:
Kaushik Devarajaiah
2017-08-22 14:48:04 -07:00
committed by vinoth chandar
parent f2980052cd
commit c98ee057fc
9 changed files with 192 additions and 23 deletions

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.table;
import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -34,13 +35,13 @@ import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.io.HoodieCreateHandle;
import com.uber.hoodie.config.HoodieStorageConfig;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
@@ -238,7 +239,41 @@ public class TestCopyOnWriteTable {
return records;
}
@Test public void testInsertWithPartialFailures() throws Exception {
// Check if record level metadata is aggregated properly at the end of write.
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata);
// Get some records belong to the same partition (2016/01/31)
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
List<HoodieRecord> records = new ArrayList<>();
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
// Insert new records
List<WriteStatus> writeStatuses = HoodieClientTestUtils
.collectStatuses(table.handleInsert(firstCommitTime, records.iterator()));
Map<String, String> allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus
.mergeMetadataForWriteStatuses(writeStatuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
// For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this should be 2 * 3
assertEquals("6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
}
@Test
public void testInsertWithPartialFailures() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String commitTime = HoodieTestUtils.makeNewCommitTime();
FileSystem fs = FSUtils.getFs();