diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java index f23c0d874..d99babe7b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java @@ -110,7 +110,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner if (records.containsKey(key)) { // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be // done when a delete (empty payload) is encountered before or after an insert/update. - HoodieRecordPayload combinedValue = records.get(key).getData().preCombine(hoodieRecord.getData()); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { // Put the record as is diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index 748c3ff22..6e323eca3 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -21,6 +21,7 @@ package com.uber.hoodie.hadoop.realtime; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -125,7 +126,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); dataFileSplits.forEach(split -> { try { - List logFilePaths = fileSlice.getLogFiles() + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); // Get the maxCommit from the last delta or compaction or commit - when // bootstrapped from COW table diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 6b64c4eaf..c9ffa802c 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -21,15 +21,22 @@ package com.uber.hoodie.hadoop.realtime; import static org.junit.Assert.assertTrue; import com.google.common.collect.Maps; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.hadoop.InputFormatTestUtil; import java.io.File; import java.io.IOException; @@ -84,15 +91,38 @@ public class HoodieRealtimeRecordReaderTest { private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, int numberOfRecords) throws InterruptedException, IOException { - return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0); + return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0); + } + + private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema schema, String fileId, + String baseCommit, String newCommit, String rolledBackInstant, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(new Path(partitionDir.getPath())) + .withFileId(fileId).overBaseCommit(baseCommit) + .withFs(fs) + .withLogVersion(logVersion) + .withLogWriteToken("1-0-1") + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + // generate metadata + Map header = Maps.newHashMap(); + header.put(HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HeaderMetadataType.TARGET_INSTANT_TIME, rolledBackInstant); + header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK + .ordinal())); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + return writer; } private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, - String baseCommit, String newCommit, int numberOfRecords, int offset) + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) + .withLogVersion(logVersion) + .withLogWriteToken("1-0-1") .overBaseCommit(baseCommit).withFs(fs).build(); List records = new ArrayList<>(); for (int i = offset; i < offset + numberOfRecords; i++) { @@ -122,58 +152,89 @@ public class HoodieRealtimeRecordReaderTest { Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); - String commitTime = "100"; + String baseInstant = "100"; File partitionDir = - partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime) - : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, commitTime); - InputFormatTestUtil.commit(basePath, commitTime); + partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, baseInstant) + : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant); + InputFormatTestUtil.commit(basePath, baseInstant); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); - // update files or generate new log file - String newCommitTime = "101"; - HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, - newCommitTime, 100); - long size = writer.getCurrentSize(); - writer.close(); - assertTrue("block - size should be > 0", size > 0); + List> logVersionsWithAction = new ArrayList<>(); + logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1)); + logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2)); + // TODO: HUDI-154 Once Hive 2.x PR (PR-674) is merged, enable this change + // logVersionsWithAction.add(Pair.of(HoodieTimeline.ROLLBACK_ACTION, 3)); + FileSlice fileSlice = new FileSlice(partitioned ? FSUtils.getRelativePartitionPath(new Path( + basePath.getRoot().getAbsolutePath()), new Path(partitionDir.getAbsolutePath())) : "default", + baseInstant, "fileid0"); + logVersionsWithAction.stream().forEach(logVersionWithAction -> { + try { + // update files or generate new log file + int logVersion = logVersionWithAction.getRight(); + String action = logVersionWithAction.getKey(); + int baseInstantTs = Integer.parseInt(baseInstant); + String instantTime = String.valueOf(baseInstantTs + logVersion); + String latestInstant = action.equals(HoodieTimeline.ROLLBACK_ACTION) + ? String.valueOf(baseInstantTs + logVersion - 2) : instantTime; - //create a split with baseFile (parquet file written earlier) and new log file(s) - String logFilePath = writer.getLogFile().getPath().toString(); - HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( - new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, - jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); + HoodieLogFormat.Writer writer = null; + if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) { + writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, + instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion); + } else { + writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant, + instantTime, 100, 0, logVersion); + } + long size = writer.getCurrentSize(); + writer.close(); + assertTrue("block - size should be > 0", size > 0); - //create a RecordReader to be used by HoodieRealtimeRecordReader - RecordReader reader = - new MapredParquetInputFormat().getRecordReader( - new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), - jobConf, null); - JobConf jobConf = new JobConf(); - List fields = schema.getFields(); - String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); - String postions = fields.stream().map(f -> String.valueOf(f.pos())) - .collect(Collectors.joining(",")); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); - if (partitioned) { - jobConf.set("partition_columns", "datestr"); - } + //create a split with baseFile (parquet file written earlier) and new log file(s) + fileSlice.addLogFile(writer.getLogFile()); + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( + new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, + jobConf), basePath.getRoot().getPath(), + fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(h -> h.getPath().toString()) + .collect(Collectors.toList()), instantTime); - //validate record reader compaction - HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + //create a RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = + new MapredParquetInputFormat().getRecordReader( + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), + jobConf, null); + JobConf jobConf = new JobConf(); + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())) + .collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + if (partitioned) { + jobConf.set("partition_columns", "datestr"); + } + + //validate record reader compaction + HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + + //use reader to read base Parquet File and log file, merge in flight and return latest commit + //here all 100 records should be updated, see above + Void key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(key, value)) { + Writable[] values = value.get(); + //check if the record written is with latest commit, here "101" + Assert.assertEquals(latestInstant, values[0].toString()); + key = recordReader.createKey(); + value = recordReader.createValue(); + } + } catch (Exception ioe) { + throw new HoodieException(ioe.getMessage(), ioe); + } + }); + + // Add Rollback last version to next log-file - //use reader to read base Parquet File and log file, merge in flight and return latest commit - //here all 100 records should be updated, see above - Void key = recordReader.createKey(); - ArrayWritable value = recordReader.createValue(); - while (recordReader.next(key, value)) { - Writable[] values = value.get(); - //check if the record written is with latest commit, here "101" - Assert.assertEquals(values[0].toString(), newCommitTime); - key = recordReader.createKey(); - value = recordReader.createValue(); - } } @Test @@ -195,7 +256,7 @@ public class HoodieRealtimeRecordReaderTest { // insert new records to log file String newCommitTime = "101"; HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, - newCommitTime, numRecords, numRecords); + newCommitTime, numRecords, numRecords, 0); long size = writer.getCurrentSize(); writer.close(); assertTrue("block - size should be > 0", size > 0);