1
0

1. Use HoodieLogFormat to archive commits and other actions 2. Introduced avro schema for commits and compactions and an avro wrapper schema

This commit is contained in:
Nishith Agarwal
2017-06-20 23:50:23 -07:00
committed by vinoth chandar
parent 616c9a68c3
commit 19c22b231e
17 changed files with 546 additions and 3854 deletions

View File

@@ -16,28 +16,31 @@
package com.uber.hoodie.io;
import com.google.common.collect.Lists;
import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@@ -66,6 +69,75 @@ public class TestHoodieCommitArchiveLog {
assertTrue(result);
}
@Test
public void testArchiveDatasetWithArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
HoodieTestUtils.createCleanFiles(basePath, "100");
HoodieTestUtils.createCleanFiles(basePath, "101");
HoodieTestUtils.createCleanFiles(basePath, "102");
HoodieTestUtils.createCleanFiles(basePath, "103");
HoodieTestUtils.createCleanFiles(basePath, "104");
HoodieTestUtils.createCleanFiles(basePath, "105");
//reload the timeline and get all the commmits before archive
timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants());
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs);
assertTrue(archiveLog.archiveIfRequired());
//reload the timeline and remove the remaining commits
timeline = metadata.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));
//read the file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(),
new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
List<IndexedRecord> readRecords = new ArrayList<>();
//read the avro blocks and validate the number of records written in each avro block
while(reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
assertEquals("Archived and read records for each block are same", 8, records.size());
archivedRecordsCount += records.size();
}
assertEquals("Total archived records and total read records are the same count", 8, archivedRecordsCount);
//make sure the archived commits are the same as the (originalcommits - commitsleft)
List<String> readCommits = readRecords.stream().map(r -> (GenericRecord)r).map(r -> {
return r.get("commitTime").toString();
}).collect(Collectors.toList());
Collections.sort(readCommits);
assertEquals(
"Read commits map should match the originalCommits - commitsLoadedFromArchival",
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
readCommits);
}
@Test
public void testArchiveDatasetWithNoArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
@@ -91,58 +163,6 @@ public class TestHoodieCommitArchiveLog {
timeline.countInstants());
}
@Test
public void testArchiveDatasetWithArchival() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
List<HoodieInstant> originalCommits = timeline.getInstants().collect(Collectors.toList());
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline =
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals(
"Should archive commits when maxCommitsToKeep is 5 and now the commits length should be minCommitsToKeep which is 2",
2, timeline.countInstants());
assertEquals("Archive should not archive the last 2 commits",
Lists.newArrayList("104", "105"),
timeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
// Remove all the commits from the original commits, make it ready to be checked against the read map
timeline.getInstants().forEach(originalCommits::remove);
// Read back the commits to make sure
SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(),
SequenceFile.Reader.file(archiveLog.getArchiveFilePath()));
Text key = new Text();
Text val = new Text();
SortedMap<String, HoodieCommitMetadata> readCommits = new TreeMap<>();
while (reader.next(key, val)) {
HoodieCommitMetadata meta = HoodieCommitMetadata.fromJsonString(val.toString());
readCommits.put(key.toString(), meta);
}
assertEquals(
"Read commits map should match the originalCommits - commitsLoadedAfterArchival",
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
new ArrayList<>(readCommits.keySet()));
reader.close();
}
@Test
public void testArchiveCommitSafety() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)