diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java index 02b93d15c..24430fbc0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieMergeOnReadTestUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.common; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.avro.Schema; @@ -51,29 +52,36 @@ public class HoodieMergeOnReadTestUtils { public static List getRecordsUsingInputFormat(List inputPaths, String basePath, Configuration conf) { JobConf jobConf = new JobConf(conf); + return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat()); + } + + public static List getRecordsUsingInputFormat(List inputPaths, + String basePath, + JobConf jobConf, + HoodieParquetInputFormat inputFormat) { Schema schema = HoodieAvroUtils.addMetadataFields( new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)); - HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); setPropsForInputFormat(inputFormat, jobConf, schema, basePath); return inputPaths.stream().map(path -> { setInputPath(jobConf, path); List records = new ArrayList<>(); try { List splits = Arrays.asList(inputFormat.getSplits(jobConf, 1)); - RecordReader recordReader = inputFormat.getRecordReader(splits.get(0), jobConf, null); - Void key = (Void) recordReader.createKey(); - ArrayWritable writable = (ArrayWritable) recordReader.createValue(); - while (recordReader.next(key, writable)) { - GenericRecordBuilder newRecord = new GenericRecordBuilder(schema); - // writable returns an array with [field1, field2, _hoodie_commit_time, - // _hoodie_commit_seqno] - Writable[] values = writable.get(); - final int[] fieldIndex = {0}; - assert schema.getFields().size() <= values.length; - schema.getFields().forEach(field -> { - newRecord.set(field, values[fieldIndex[0]++]); - }); - records.add(newRecord.build()); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null); + Void key = (Void) recordReader.createKey(); + ArrayWritable writable = (ArrayWritable) recordReader.createValue(); + while (recordReader.next(key, writable)) { + GenericRecordBuilder newRecord = new GenericRecordBuilder(schema); + // writable returns an array with [field1, field2, _hoodie_commit_time, + // _hoodie_commit_seqno] + Writable[] values = writable.get(); + assert schema.getFields().size() <= values.length; + schema.getFields().forEach(field -> { + newRecord.set(field, values[field.pos()]); + }); + records.add(newRecord.build()); + } } } catch (IOException ie) { ie.printStackTrace(); @@ -85,7 +93,7 @@ public class HoodieMergeOnReadTestUtils { }).orElse(new ArrayList()); } - private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf, + private static void setPropsForInputFormat(HoodieParquetInputFormat inputFormat, JobConf jobConf, Schema schema, String basePath) { List fields = schema.getFields(); String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 11e94e474..7fd1be5e0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; @@ -29,9 +30,9 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; @@ -39,11 +40,17 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.HoodieHiveUtil; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.table.HoodieCopyOnWriteTable.UpsertPartitioner; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; @@ -64,7 +71,6 @@ import java.util.UUID; import scala.Tuple2; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -129,6 +135,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); String partitionPath = "/2016/01/31"; @@ -154,30 +162,17 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { // Insert new records final HoodieCopyOnWriteTable cowTable = table; - jsc.parallelize(Arrays.asList(1)).map(x -> { - return cowTable.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator()); - }).map(x -> HoodieClientTestUtils.collectStatuses(x)).collect(); + writeClient.insert(jsc.parallelize(records, 1), firstCommitTime); - // We should have a parquet file generated (TODO: better control # files after we revise - // AvroParquetIO) - File parquetFile = null; - for (File file : new File(this.basePath + partitionPath).listFiles()) { - if (file.getName().endsWith(".parquet")) { - parquetFile = file; - break; - } - } - assertTrue(parquetFile != null); + FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1); + assertEquals(1, allFiles.length); // Read out the bloom filter and make sure filter can answer record exist or not - Path parquetFilePath = new Path(parquetFile.getAbsolutePath()); + Path parquetFilePath = allFiles[0].getPath(); BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } - // Create a commit file - new File(this.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + FSUtils.getCommitTime(parquetFile.getName()) + ".commit").createNewFile(); // Read the parquet file, check the record content List fileRecords = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), parquetFilePath); @@ -194,9 +189,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { TestRawTripPayload updateRowChanges1 = new TestRawTripPayload(updateRecordStr1); HoodieRecord updatedRecord1 = new HoodieRecord( new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1); - updatedRecord1.unseal(); - updatedRecord1.setCurrentLocation(new HoodieRecordLocation(null, FSUtils.getFileId(parquetFile.getName()))); - updatedRecord1.seal(); TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); HoodieRecord insertedRecord1 = @@ -207,27 +199,16 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { Thread.sleep(1000); String newCommitTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieCopyOnWriteTable newTable = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); - List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { - return newTable.handleUpdate(newCommitTime, updatedRecord1.getPartitionPath(), - updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); - }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); + writeClient.startCommitWithTime(newCommitTime); + List statuses = writeClient.upsert(jsc.parallelize(updatedRecords), newCommitTime).collect(); + + allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1); + assertEquals(1, allFiles.length); + // verify new incremental file group is same as the previous one + assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); - // Check the updated file - File updatedParquetFile = null; - for (File file : new File(basePath + "/2016/01/31").listFiles()) { - if (file.getName().endsWith(".parquet")) { - if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName())) - && HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()), - FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) { - updatedParquetFile = file; - break; - } - } - } - assertNotNull(updatedParquetFile); // Check whether the record has been updated - Path updatedParquetFilePath = new Path(updatedParquetFile.getAbsolutePath()); + Path updatedParquetFilePath = allFiles[0].getPath(); BloomFilter updatedFilter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), updatedParquetFilePath); for (HoodieRecord record : records) { @@ -254,6 +235,32 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record } + private FileStatus[] getIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) + throws Exception { + // initialize parquet input format + HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat(); + JobConf jobConf = new JobConf(jsc.hadoopConfiguration()); + hoodieInputFormat.setConf(jobConf); + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); + setupIncremental(jobConf, startCommitTime, numCommitsToPull); + FileInputFormat.setInputPaths(jobConf, basePath + partitionPath); + return hoodieInputFormat.listStatus(jobConf); + } + + private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { + String modePropertyName = + String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = + String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = + String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + } + private List newHoodieRecords(int n, String time) throws Exception { List records = new ArrayList<>(); for (int i = 0; i < n; i++) { diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 2c2722c02..8a726c7d1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -18,6 +18,8 @@ package org.apache.hudi.table; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; @@ -50,6 +52,9 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.HoodieHiveUtil; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; @@ -70,6 +75,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -80,6 +86,12 @@ import static org.junit.Assert.assertTrue; public class TestMergeOnReadTable extends HoodieClientTestHarness { + private HoodieParquetInputFormat roInputFormat; + private JobConf roJobConf; + + private HoodieParquetRealtimeInputFormat rtInputFormat; + private JobConf rtJobConf; + @Before public void init() throws IOException { initDFS(); @@ -89,6 +101,15 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { dfs.mkdirs(new Path(basePath)); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); initTestDataGenerator(); + + // initialize parquet input format + roInputFormat = new HoodieParquetInputFormat(); + roJobConf = new JobConf(jsc.hadoopConfiguration()); + roInputFormat.setConf(roJobConf); + + rtInputFormat = new HoodieParquetRealtimeInputFormat(); + rtJobConf = new JobConf(jsc.hadoopConfiguration()); + rtInputFormat.setConf(rtJobConf); } @After @@ -114,63 +135,23 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - List statuses = client.upsert(writeRecords, newCommitTime).collect(); - assertNoWriteErrors(statuses); - - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); - - Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); - assertTrue(deltaCommit.isPresent()); - Assert.assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); - - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); - - FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); - assertTrue(!dataFilesToRead.findAny().isPresent()); - - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); - assertTrue("should list the parquet files we wrote in the delta commit", - dataFilesToRead.findAny().isPresent()); + insertAndGetFilePaths(records, client, cfg, newCommitTime); /** * Write 2 (updates) */ newCommitTime = "004"; client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 100); - Map recordsMap = new HashMap<>(); - for (HoodieRecord rec : records) { - if (!recordsMap.containsKey(rec.getKey())) { - recordsMap.put(rec.getKey(), rec); - } - } - - statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - metaClient = HoodieTableMetaClient.reload(metaClient); - deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp()); - - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); + updateAndGetFilePaths(records, client, cfg, newCommitTime); String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); client.compact(compactionCommitTime); - allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + Stream dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit @@ -186,6 +167,101 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { } } + // test incremental read does not go past compaction instant for RO views + // For RT views, incremental read can go past compaction + @Test + public void testIncrementalReadsWithCompaction() throws Exception { + String partitionPath = "2020/02/20"; // use only one partition for this test + dataGen = new HoodieTestDataGenerator(new String[] { partitionPath }); + HoodieWriteConfig cfg = getConfig(true); + try (HoodieWriteClient client = getWriteClient(cfg);) { + + /** + * Write 1 (only inserts) + */ + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + + List records001 = dataGen.generateInserts(commitTime1, 200); + insertAndGetFilePaths(records001, client, cfg, commitTime1); + + // verify only one parquet file shows up with commit time 001 + FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true); + validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + roJobConf,200, commitTime1); + Path firstFilePath = incrementalROFiles[0].getPath(); + + FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + rtJobConf,200, commitTime1); + assertEquals(firstFilePath, incrementalRTFiles[0].getPath()); + + /** + * Write 2 (updates) + */ + String updateTime = "004"; + client.startCommitWithTime(updateTime); + List records004 = dataGen.generateUpdates(updateTime, 100); + updateAndGetFilePaths(records004, client, cfg, updateTime); + + // verify RO incremental reads - only one parquet file shows up because updates to into log files + incrementalROFiles = getROIncrementalFiles(partitionPath, false); + validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + roJobConf, 200, commitTime1); + assertEquals(firstFilePath, incrementalROFiles[0].getPath()); + + // verify RT incremental reads includes updates also + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + rtJobConf, 200, commitTime1, updateTime); + + // request compaction, but do not perform compaction + String compactionCommitTime = "005"; + client.scheduleCompactionAtInstant("005", Option.empty()); + + // verify RO incremental reads - only one parquet file shows up because updates go into log files + incrementalROFiles = getROIncrementalFiles(partitionPath, true); + validateIncrementalFiles(partitionPath,1, incrementalROFiles, roInputFormat, + roJobConf, 200, commitTime1); + + // verify RT incremental reads includes updates also + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat, + rtJobConf, 200, commitTime1, updateTime); + + // write 3 - more inserts + String insertsTime = "006"; + List records006 = dataGen.generateInserts(insertsTime, 200); + client.startCommitWithTime(insertsTime); + insertAndGetFilePaths(records006, client, cfg, insertsTime); + + incrementalROFiles = getROIncrementalFiles(partitionPath, true); + assertEquals(firstFilePath, incrementalROFiles[0].getPath()); + // verify 006 does not show up in RO mode because of pending compaction + validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat, + roJobConf, 200, commitTime1); + + // verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up + incrementalROFiles = getROIncrementalFiles(partitionPath, false); + validateIncrementalFiles(partitionPath,2, incrementalROFiles, roInputFormat, + roJobConf, 400, commitTime1, insertsTime); + + // verify 006 shows up in RT views + incrementalRTFiles = getRTIncrementalFiles(partitionPath); + validateIncrementalFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat, + rtJobConf, 400, commitTime1, updateTime, insertsTime); + + // perform the scheduled compaction + client.compact(compactionCommitTime); + + incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true); + assertTrue(incrementalROFiles.length == 2); + // verify 006 shows up because of pending compaction + validateIncrementalFiles(partitionPath, 2, incrementalROFiles, roInputFormat, + roJobConf, 400, commitTime1, compactionCommitTime, insertsTime); + } + } + // Check if record level metadata is aggregated properly at the end of write. @Test public void testMetadataAggregateFromWriteStatus() throws Exception { @@ -1309,4 +1385,115 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); } } + + private FileStatus[] insertAndGetFilePaths(List records, HoodieWriteClient client, + HoodieWriteConfig cfg, String commitTime) throws IOException { + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.insert(writeRecords, commitTime).collect(); + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + Assert.assertEquals("Delta commit should be specified value", commitTime, deltaCommit.get().getTimestamp()); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + BaseFileOnlyView roView = + new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = roView.getLatestBaseFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = roView.getLatestBaseFiles(); + assertTrue("should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + return allFiles; + } + + private FileStatus[] updateAndGetFilePaths(List records, HoodieWriteClient client, + HoodieWriteConfig cfg, String commitTime) throws IOException { + Map recordsMap = new HashMap<>(); + for (HoodieRecord rec : records) { + if (!recordsMap.containsKey(rec.getKey())) { + recordsMap.put(rec.getKey(), rec); + } + } + + List statuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + metaClient = HoodieTableMetaClient.reload(metaClient); + Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Latest Delta commit should match specified time", + commitTime, deltaCommit.get().getTimestamp()); + + Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + } + + private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction) + throws Exception { + return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction); + } + + private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) + throws Exception { + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction); + FileInputFormat.setInputPaths(roJobConf, basePath + "/" + partitionPath); + return roInputFormat.listStatus(roJobConf); + } + + private FileStatus[] getRTIncrementalFiles(String partitionPath) + throws Exception { + return getRTIncrementalFiles(partitionPath, "000", -1); + } + + private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) + throws Exception { + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false); + FileInputFormat.setInputPaths(rtJobConf, basePath + "/" + partitionPath); + return rtInputFormat.listStatus(rtJobConf); + } + + private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) { + String modePropertyName = + String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = + String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = + String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + + String stopAtCompactionPropName = + String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction); + } + + private void validateIncrementalFiles(String partitionPath, int expectedNumFiles, + FileStatus[] files, HoodieParquetInputFormat inputFormat, + JobConf jobConf, int expectedRecords, String... expectedCommits) { + + assertEquals(expectedNumFiles, files.length); + Set expectedCommitsSet = Arrays.asList(expectedCommits).stream().collect(Collectors.toSet()); + List records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( + Arrays.asList(basePath + "/" + partitionPath), basePath, jobConf, inputFormat); + assertEquals(expectedRecords, records.size()); + Set actualCommits = records.stream().map(r -> + r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet()); + assertEquals(expectedCommitsSet, actualCommits); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 5fa4a0281..519e1f7f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -112,7 +112,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { } @Override - public HoodieTimeline getCommitsAndCompactionTimeline() { + public HoodieDefaultTimeline getCommitsAndCompactionTimeline() { Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION); return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); } @@ -136,6 +136,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline { details); } + @Override + public HoodieDefaultTimeline findInstantsBefore(String instantTime) { + return new HoodieDefaultTimeline(instants.stream() + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, LESSER)), + details); + } + @Override public HoodieTimeline filter(Predicate filter) { return new HoodieDefaultTimeline(instants.stream().filter(filter), details); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index cdd3a852c..05d41f99e 100755 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -143,6 +143,11 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline findInstantsAfter(String instantTime, int numCommits); + /** + * Create a new Timeline with all instants before specified time. + */ + HoodieTimeline findInstantsBefore(String instantTime); + /** * Custom Filter of Instants. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 175894b20..3d8eff780 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -25,8 +26,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; - -import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -153,10 +152,15 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { public void testTimelineOperations() { timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"), Stream.of("21", "23")); - HoodieTestUtils.assertStreamEquals("", Stream.of("05", "07", "09", "11"), timeline.getCommitTimeline() - .filterCompletedInstants().findInstantsInRange("04", "11").getInstants().map(HoodieInstant::getTimestamp)); - HoodieTestUtils.assertStreamEquals("", Stream.of("09", "11"), timeline.getCommitTimeline().filterCompletedInstants() - .findInstantsAfter("07", 2).getInstants().map(HoodieInstant::getTimestamp)); + HoodieTestUtils.assertStreamEquals("findInstantsInRange should return 4 instants", Stream.of("05", "07", "09", "11"), + timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11") + .getInstants().map(HoodieInstant::getTimestamp)); + HoodieTestUtils.assertStreamEquals("findInstantsAfter 07 should return 2 instants", Stream.of("09", "11"), + timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2) + .getInstants().map(HoodieInstant::getTimestamp)); + HoodieTestUtils.assertStreamEquals("findInstantsBefore 07 should return 3 instants", Stream.of("01", "03", "05"), + timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07") + .getInstants().map(HoodieInstant::getTimestamp)); assertFalse(timeline.empty()); assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty()); assertEquals("", 12, timeline.countInstants()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java index 42b544198..526639133 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java @@ -38,6 +38,23 @@ public class HoodieHiveUtil { public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits"; + /* + * Boolean property to stop incremental reader when there is a pending compaction. + * This is needed to prevent certain race conditions with RO views of MOR tables. only applicable for RO views. + * + * example timeline: + * + * t0 -> create bucket1.parquet + * t1 -> create and append updates bucket1.log + * t2 -> request compaction + * t3 -> create bucket2.parquet + * + * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1 + * + * To workaround this problem, we want to stop returning data belonging to commits > t2. + * After compaction is complete, incremental reader would see updates in t2, t3, so on. + */ + public static final String HOODIE_STOP_AT_COMPACTION_PATTERN = "hoodie.%s.ro.stop.at.compaction"; public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL"; public static final String SNAPSHOT_SCAN_MODE = "SNAPSHOT"; public static final String DEFAULT_SCAN_MODE = SNAPSHOT_SCAN_MODE; @@ -46,6 +63,13 @@ public class HoodieHiveUtil { public static final int DEFAULT_LEVELS_TO_BASEPATH = 3; public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode"); + public static boolean stopAtCompaction(JobContext job, String tableName) { + String compactionPropName = String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName); + boolean stopAtCompaction = job.getConfiguration().getBoolean(compactionPropName, true); + LOG.info("Read stop at compaction - " + stopAtCompaction); + return stopAtCompaction; + } + public static Integer readMaxCommits(JobContext job, String tableName) { String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName); int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 8f6dd5999..6b0ecb9b0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,17 +18,6 @@ package org.apache.hudi.hadoop; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.exception.HoodieIOException; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -42,6 +31,18 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -118,6 +119,36 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement return returns.toArray(new FileStatus[returns.size()]); } + /** + * Filter any specific instants that we do not want to process. + * example timeline: + * + * t0 -> create bucket1.parquet + * t1 -> create and append updates bucket1.log + * t2 -> request compaction + * t3 -> create bucket2.parquet + * + * if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1 + * + * To workaround this problem, we want to stop returning data belonging to commits > t2. + * After compaction is complete, incremental reader would see updates in t2, t3, so on. + */ + protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { + HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline(); + Option pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant(); + if (pendingCompactionInstant.isPresent()) { + HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp()); + int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants() + - instantsTimeline.getCommitsTimeline().countInstants(); + LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp() + + " skipping " + numCommitsFilteredByCompaction + " commits"); + + return instantsTimeline; + } else { + return timeline; + } + } + /** * Achieves listStatus functionality for an incrementally queried table. Instead of listing all * partitions and then filtering based on the commits of interest, this logic first extracts the @@ -126,10 +157,18 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement private List listStatusForIncrementalMode( JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { String tableName = tableMetaClient.getTableConfig().getTableName(); - HoodieTimeline timeline = tableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); + Job jobContext = Job.getInstance(job); + HoodieDefaultTimeline baseTimeline; + if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) { + baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline()); + } else { + baseTimeline = tableMetaClient.getActiveTimeline(); + } + + HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants(); + String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName); // Total number of commits to return in this batch. Set this to -1 to get all the commits. - Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName); + Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName); LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); List commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits) .getInstants().collect(Collectors.toList()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 833b1bb8d..ebb784df5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -159,6 +160,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i return super.listStatus(job); } + @Override + protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { + // no specific filtering for Realtime format + return timeline; + } + /** * Add a field to the existing fields projected. */ diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java index 3a036f5ba..0f7bc1e5c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java @@ -50,8 +50,13 @@ public class InputFormatTestUtil { basePath.create(); HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString()); File partitionPath = basePath.newFolder("2016", "05", "01"); + return simulateInserts(partitionPath, "fileId1", numberOfFiles, commitNumber); + } + + public static File simulateInserts(File partitionPath, String fileId, int numberOfFiles, String commitNumber) + throws IOException { for (int i = 0; i < numberOfFiles; i++) { - File dataFile = new File(partitionPath, FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i)); + File dataFile = new File(partitionPath, FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i)); dataFile.createNewFile(); } return partitionPath; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 399fc1cfd..174175785 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTestUtils; @@ -32,6 +33,11 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -41,9 +47,11 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestHoodieParquetInputFormat { @@ -61,6 +69,67 @@ public class TestHoodieParquetInputFormat { @Rule public TemporaryFolder basePath = new TemporaryFolder(); + // Verify that HoodieParquetInputFormat does not return instants after pending compaction + @Test + public void testPendingCompactionWithActiveCommits() throws IOException { + // setup 4 sample instants in timeline + List instants = new ArrayList<>(); + HoodieInstant t1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1"); + HoodieInstant t2 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "2"); + HoodieInstant t3 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "3"); + HoodieInstant t4 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "4"); + HoodieInstant t5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "5"); + HoodieInstant t6 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "6"); + + instants.add(t1); + instants.add(t2); + instants.add(t3); + instants.add(t4); + instants.add(t5); + instants.add(t6); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getRoot().getAbsolutePath().toString()); + HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(instants); + + // Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant + HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + assertTrue(filteredTimeline.containsInstant(t1)); + assertTrue(filteredTimeline.containsInstant(t2)); + assertFalse(filteredTimeline.containsInstant(t3)); + assertFalse(filteredTimeline.containsInstant(t4)); + assertFalse(filteredTimeline.containsInstant(t5)); + assertFalse(filteredTimeline.containsInstant(t6)); + + + // remove compaction instant and setup timeline again + instants.remove(t3); + timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(instants); + filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + + // verify all remaining instants are returned. + assertTrue(filteredTimeline.containsInstant(t1)); + assertTrue(filteredTimeline.containsInstant(t2)); + assertFalse(filteredTimeline.containsInstant(t3)); + assertTrue(filteredTimeline.containsInstant(t4)); + assertFalse(filteredTimeline.containsInstant(t5)); + assertFalse(filteredTimeline.containsInstant(t6)); + + // remove remaining compaction instant and setup timeline again + instants.remove(t5); + timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(instants); + filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline); + + // verify all remaining instants are returned. + assertTrue(filteredTimeline.containsInstant(t1)); + assertTrue(filteredTimeline.containsInstant(t2)); + assertFalse(filteredTimeline.containsInstant(t3)); + assertTrue(filteredTimeline.containsInstant(t4)); + assertFalse(filteredTimeline.containsInstant(t5)); + assertTrue(filteredTimeline.containsInstant(t6)); + } + @Test public void testInputFormatLoad() throws IOException { // initial commit @@ -133,6 +202,22 @@ public class TestHoodieParquetInputFormat { fileOutputStream.close(); } + private File createCompactionFile(TemporaryFolder basePath, String commitTime) + throws IOException { + File file = new File(basePath.getRoot().toString() + "/.hoodie/", + HoodieTimeline.makeRequestedCompactionFileName(commitTime)); + assertTrue(file.createNewFile()); + FileOutputStream os = new FileOutputStream(file); + try { + HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build(); + // Write empty commit metadata + os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get()); + return file; + } finally { + os.close(); + } + } + @Test public void testIncrementalWithMultipleCommits() throws IOException { // initial commit @@ -228,6 +313,43 @@ public class TestHoodieParquetInputFormat { } } + // test incremental read does not go past compaction instant for RO views + @Test + public void testIncrementalWithPendingCompaction() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100"); + createCommitFile(basePath, "100", "2016/05/01"); + + // simulate compaction requested at 300 + File compactionFile = createCompactionFile(basePath, "300"); + + // write inserts into new bucket + InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400"); + createCommitFile(basePath, "400", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + InputFormatTestUtil.setupIncremental(jobConf, "0", -1); + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals("Pulling all commit from beginning, should not return instants after begin compaction", + 10, files.length); + ensureFilesInCommit("Pulling all commit from beginning, should not return instants after begin compaction", + files, "100", 10); + + // delete compaction and verify inserts show up + compactionFile.delete(); + InputFormatTestUtil.setupIncremental(jobConf, "0", -1); + files = inputFormat.listStatus(jobConf); + assertEquals("after deleting compaction, should get all inserted files", + 20, files.length); + + ensureFilesInCommit("Pulling all commit from beginning, should return instants before requested compaction", + files, "100", 10); + ensureFilesInCommit("Pulling all commit from beginning, should return instants after requested compaction", + files, "400", 10); + + } + private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit, int totalExpected) throws IOException { int actualCount = 0;