1
0

Ensure log files are consistently ordered when scanning

This commit is contained in:
Balaji Varadarajan
2019-06-11 19:06:06 -07:00
committed by n3nash
parent b791473a6d
commit 1c943ab230
3 changed files with 111 additions and 49 deletions

View File

@@ -110,7 +110,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
if (records.containsKey(key)) { if (records.containsKey(key)) {
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
// done when a delete (empty payload) is encountered before or after an insert/update. // done when a delete (empty payload) is encountered before or after an insert/update.
HoodieRecordPayload combinedValue = records.get(key).getData().preCombine(hoodieRecord.getData()); HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
} else { } else {
// Put the record as is // Put the record as is

View File

@@ -21,6 +21,7 @@ package com.uber.hoodie.hadoop.realtime;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
@@ -125,7 +126,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> { dataFileSplits.forEach(split -> {
try { try {
List<String> logFilePaths = fileSlice.getLogFiles() List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
// Get the maxCommit from the last delta or compaction or commit - when // Get the maxCommit from the last delta or compaction or commit - when
// bootstrapped from COW table // bootstrapped from COW table

View File

@@ -21,15 +21,22 @@ package com.uber.hoodie.hadoop.realtime;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.SchemaTestUtil;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.hadoop.InputFormatTestUtil; import com.uber.hoodie.hadoop.InputFormatTestUtil;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@@ -84,15 +91,38 @@ public class HoodieRealtimeRecordReaderTest {
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords) String baseCommit, String newCommit, int numberOfRecords)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0); return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0);
}
private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, String rolledBackInstant, int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(partitionDir.getPath()))
.withFileId(fileId).overBaseCommit(baseCommit)
.withFs(fs)
.withLogVersion(logVersion)
.withLogWriteToken("1-0-1")
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
// generate metadata
Map<HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HeaderMetadataType.INSTANT_TIME, newCommit);
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
.ordinal()));
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(header));
return writer;
} }
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords, int offset) String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
throws InterruptedException, IOException { throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(partitionDir.getPath())) .onParentPath(new Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
.withLogVersion(logVersion)
.withLogWriteToken("1-0-1")
.overBaseCommit(baseCommit).withFs(fs).build(); .overBaseCommit(baseCommit).withFs(fs).build();
List<IndexedRecord> records = new ArrayList<>(); List<IndexedRecord> records = new ArrayList<>();
for (int i = offset; i < offset + numberOfRecords; i++) { for (int i = offset; i < offset + numberOfRecords; i++) {
@@ -122,58 +152,89 @@ public class HoodieRealtimeRecordReaderTest {
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(),
HoodieTableType.MERGE_ON_READ); HoodieTableType.MERGE_ON_READ);
String commitTime = "100"; String baseInstant = "100";
File partitionDir = File partitionDir =
partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime) partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, baseInstant)
: InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, commitTime); : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant);
InputFormatTestUtil.commit(basePath, commitTime); InputFormatTestUtil.commit(basePath, baseInstant);
// Add the paths // Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// update files or generate new log file List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
String newCommitTime = "101"; logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
newCommitTime, 100); // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change
long size = writer.getCurrentSize(); // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3));
writer.close(); FileSlice fileSlice = new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path(
assertTrue("block - size should be > 0", size > 0); basePath.getRoot().getAbsolutePath()), new Path(partitionDir.getAbsolutePath())) : "default",
baseInstant, "fileid0");
logVersionsWithAction.stream().forEach(logVersionWithAction -> {
try {
// update files or generate new log file
int logVersion = logVersionWithAction.getRight();
String action = logVersionWithAction.getKey();
int baseInstantTs = Integer.parseInt(baseInstant);
String instantTime = String.valueOf(baseInstantTs + logVersion);
String latestInstant = action.equals(HoodieTimeline.ROLLBACK_ACTION)
? String.valueOf(baseInstantTs + logVersion - 2) : instantTime;
//create a split with baseFile (parquet file written earlier) and new log file(s) HoodieLogFormat.Writer writer = null;
String logFilePath = writer.getLogFile().getPath().toString(); if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion);
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); } else {
writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
instantTime, 100, 0, logVersion);
}
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);
//create a RecordReader to be used by HoodieRealtimeRecordReader //create a split with baseFile (parquet file written earlier) and new log file(s)
RecordReader<Void, ArrayWritable> reader = fileSlice.addLogFile(writer.getLogFile());
new MapredParquetInputFormat().getRecordReader( HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1,
jobConf, null); jobConf), basePath.getRoot().getPath(),
JobConf jobConf = new JobConf(); fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(h -> h.getPath().toString())
List<Schema.Field> fields = schema.getFields(); .collect(Collectors.toList()), instantTime);
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos()))
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
if (partitioned) {
jobConf.set("partition_columns", "datestr");
}
//validate record reader compaction //create a RecordReader to be used by HoodieRealtimeRecordReader
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); RecordReader<Void, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
String postions = fields.stream().map(f -> String.valueOf(f.pos()))
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
if (partitioned) {
jobConf.set("partition_columns", "datestr");
}
//validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
//use reader to read base Parquet File and log file, merge in flight and return latest commit
//here all 100 records should be updated, see above
Void key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
Writable[] values = value.get();
//check if the record written is with latest commit, here "101"
Assert.assertEquals(latestInstant, values[0].toString());
key = recordReader.createKey();
value = recordReader.createValue();
}
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
}
});
// Add Rollback last version to next log-file
//use reader to read base Parquet File and log file, merge in flight and return latest commit
//here all 100 records should be updated, see above
Void key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
Writable[] values = value.get();
//check if the record written is with latest commit, here "101"
Assert.assertEquals(values[0].toString(), newCommitTime);
key = recordReader.createKey();
value = recordReader.createValue();
}
} }
@Test @Test
@@ -195,7 +256,7 @@ public class HoodieRealtimeRecordReaderTest {
// insert new records to log file // insert new records to log file
String newCommitTime = "101"; String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime,
newCommitTime, numRecords, numRecords); newCommitTime, numRecords, numRecords, 0);
long size = writer.getCurrentSize(); long size = writer.getCurrentSize();
writer.close(); writer.close();
assertTrue("block - size should be > 0", size > 0); assertTrue("block - size should be > 0", size > 0);