1
0

[HUDI-687] Stop incremental reader on RO table when there is a pending compaction (#1396)

This commit is contained in:
satishkotha
2020-04-10 10:45:41 -07:00
committed by GitHub
parent 8c7cef3e50
commit c0f96e0726
11 changed files with 540 additions and 125 deletions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.avro.Schema;
@@ -51,29 +52,36 @@ public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths, String basePath,
Configuration conf) {
JobConf jobConf = new JobConf(conf);
return getRecordsUsingInputFormat(inputPaths, basePath, jobConf, new HoodieParquetRealtimeInputFormat());
}
public static List<GenericRecord> getRecordsUsingInputFormat(List<String> inputPaths,
String basePath,
JobConf jobConf,
HoodieParquetInputFormat inputFormat) {
Schema schema = HoodieAvroUtils.addMetadataFields(
new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat();
setPropsForInputFormat(inputFormat, jobConf, schema, basePath);
return inputPaths.stream().map(path -> {
setInputPath(jobConf, path);
List<GenericRecord> records = new ArrayList<>();
try {
List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
RecordReader recordReader = inputFormat.getRecordReader(splits.get(0), jobConf, null);
Void key = (Void) recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(schema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
final int[] fieldIndex = {0};
assert schema.getFields().size() <= values.length;
schema.getFields().forEach(field -> {
newRecord.set(field, values[fieldIndex[0]++]);
});
records.add(newRecord.build());
for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Void key = (Void) recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(schema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
assert schema.getFields().size() <= values.length;
schema.getFields().forEach(field -> {
newRecord.set(field, values[field.pos()]);
});
records.add(newRecord.build());
}
}
} catch (IOException ie) {
ie.printStackTrace();
@@ -85,7 +93,7 @@ public class HoodieMergeOnReadTestUtils {
}).orElse(new ArrayList<GenericRecord>());
}
private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf,
private static void setPropsForInputFormat(HoodieParquetInputFormat inputFormat, JobConf jobConf,
Schema schema, String basePath) {
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils;
@@ -29,9 +30,9 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
@@ -39,11 +40,17 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.table.HoodieCopyOnWriteTable.UpsertPartitioner;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
@@ -64,7 +71,6 @@ import java.util.UUID;
import scala.Tuple2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -129,6 +135,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfig();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
String partitionPath = "/2016/01/31";
@@ -154,30 +162,17 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
// Insert new records
final HoodieCopyOnWriteTable cowTable = table;
jsc.parallelize(Arrays.asList(1)).map(x -> {
return cowTable.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
}).map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
writeClient.insert(jsc.parallelize(records, 1), firstCommitTime);
// We should have a parquet file generated (TODO: better control # files after we revise
// AvroParquetIO)
File parquetFile = null;
for (File file : new File(this.basePath + partitionPath).listFiles()) {
if (file.getName().endsWith(".parquet")) {
parquetFile = file;
break;
}
}
assertTrue(parquetFile != null);
FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1);
assertEquals(1, allFiles.length);
// Read out the bloom filter and make sure filter can answer record exist or not
Path parquetFilePath = new Path(parquetFile.getAbsolutePath());
Path parquetFilePath = allFiles[0].getPath();
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath);
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}
// Create a commit file
new File(this.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ FSUtils.getCommitTime(parquetFile.getName()) + ".commit").createNewFile();
// Read the parquet file, check the record content
List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), parquetFilePath);
@@ -194,9 +189,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
TestRawTripPayload updateRowChanges1 = new TestRawTripPayload(updateRecordStr1);
HoodieRecord updatedRecord1 = new HoodieRecord(
new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1);
updatedRecord1.unseal();
updatedRecord1.setCurrentLocation(new HoodieRecordLocation(null, FSUtils.getFileId(parquetFile.getName())));
updatedRecord1.seal();
TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
HoodieRecord insertedRecord1 =
@@ -207,27 +199,16 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
Thread.sleep(1000);
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable newTable = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newTable.handleUpdate(newCommitTime, updatedRecord1.getPartitionPath(),
updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator());
}).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
writeClient.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeClient.upsert(jsc.parallelize(updatedRecords), newCommitTime).collect();
allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1);
assertEquals(1, allFiles.length);
// verify new incremental file group is same as the previous one
assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName()));
// Check the updated file
File updatedParquetFile = null;
for (File file : new File(basePath + "/2016/01/31").listFiles()) {
if (file.getName().endsWith(".parquet")) {
if (FSUtils.getFileId(file.getName()).equals(FSUtils.getFileId(parquetFile.getName()))
&& HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()),
FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
updatedParquetFile = file;
break;
}
}
}
assertNotNull(updatedParquetFile);
// Check whether the record has been updated
Path updatedParquetFilePath = new Path(updatedParquetFile.getAbsolutePath());
Path updatedParquetFilePath = allFiles[0].getPath();
BloomFilter updatedFilter =
ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), updatedParquetFilePath);
for (HoodieRecord record : records) {
@@ -254,6 +235,32 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record
}
private FileStatus[] getIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception {
// initialize parquet input format
HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat();
JobConf jobConf = new JobConf(jsc.hadoopConfiguration());
hoodieInputFormat.setConf(jobConf);
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
setupIncremental(jobConf, startCommitTime, numCommitsToPull);
FileInputFormat.setInputPaths(jobConf, basePath + partitionPath);
return hoodieInputFormat.listStatus(jobConf);
}
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) {
String modePropertyName =
String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
}
private List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {
List<HoodieRecord> records = new ArrayList<>();
for (int i = 0; i < n; i++) {

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.table;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -50,6 +52,9 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieHiveUtil;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
@@ -70,6 +75,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -80,6 +86,12 @@ import static org.junit.Assert.assertTrue;
public class TestMergeOnReadTable extends HoodieClientTestHarness {
private HoodieParquetInputFormat roInputFormat;
private JobConf roJobConf;
private HoodieParquetRealtimeInputFormat rtInputFormat;
private JobConf rtJobConf;
@Before
public void init() throws IOException {
initDFS();
@@ -89,6 +101,15 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
dfs.mkdirs(new Path(basePath));
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
// initialize parquet input format
roInputFormat = new HoodieParquetInputFormat();
roJobConf = new JobConf(jsc.hadoopConfiguration());
roInputFormat.setConf(roJobConf);
rtInputFormat = new HoodieParquetRealtimeInputFormat();
rtJobConf = new JobConf(jsc.hadoopConfiguration());
rtInputFormat.setConf(rtJobConf);
}
@After
@@ -114,63 +135,23 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
Assert.assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
insertAndGetFilePaths(records, client, cfg, newCommitTime);
/**
* Write 2 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
recordsMap.put(rec.getKey(), rec);
}
}
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
updateAndGetFilePaths(records, client, cfg, newCommitTime);
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
client.compact(compactionCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
@@ -186,6 +167,101 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
}
}
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
@Test
public void testIncrementalReadsWithCompaction() throws Exception {
String partitionPath = "2020/02/20"; // use only one partition for this test
dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertAndGetFilePaths(records001, client, cfg, commitTime1);
// verify only one parquet file shows up with commit time 001
FileStatus[] incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf,200, commitTime1);
Path firstFilePath = incrementalROFiles[0].getPath();
FileStatus[] incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf,200, commitTime1);
assertEquals(firstFilePath, incrementalRTFiles[0].getPath());
/**
* Write 2 (updates)
*/
String updateTime = "004";
client.startCommitWithTime(updateTime);
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
updateAndGetFilePaths(records004, client, cfg, updateTime);
// verify RO incremental reads - only one parquet file shows up because updates to into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
// request compaction, but do not perform compaction
String compactionCommitTime = "005";
client.scheduleCompactionAtInstant("005", Option.empty());
// verify RO incremental reads - only one parquet file shows up because updates go into log files
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
validateIncrementalFiles(partitionPath,1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
// verify RT incremental reads includes updates also
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 1, incrementalRTFiles, rtInputFormat,
rtJobConf, 200, commitTime1, updateTime);
// write 3 - more inserts
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
insertAndGetFilePaths(records006, client, cfg, insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, true);
assertEquals(firstFilePath, incrementalROFiles[0].getPath());
// verify 006 does not show up in RO mode because of pending compaction
validateIncrementalFiles(partitionPath, 1, incrementalROFiles, roInputFormat,
roJobConf, 200, commitTime1);
// verify that if stopAtCompaction is disabled, inserts from "insertsTime" show up
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
validateIncrementalFiles(partitionPath,2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, insertsTime);
// verify 006 shows up in RT views
incrementalRTFiles = getRTIncrementalFiles(partitionPath);
validateIncrementalFiles(partitionPath, 2, incrementalRTFiles, rtInputFormat,
rtJobConf, 400, commitTime1, updateTime, insertsTime);
// perform the scheduled compaction
client.compact(compactionCommitTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
validateIncrementalFiles(partitionPath, 2, incrementalROFiles, roInputFormat,
roJobConf, 400, commitTime1, compactionCommitTime, insertsTime);
}
}
// Check if record level metadata is aggregated properly at the end of write.
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
@@ -1309,4 +1385,115 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
}
}
private FileStatus[] insertAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
Assert.assertEquals("Delta commit should be specified value", commitTime, deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
return allFiles;
}
private FileStatus[] updateAndGetFilePaths(List<HoodieRecord> records, HoodieWriteClient client,
HoodieWriteConfig cfg, String commitTime) throws IOException {
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
recordsMap.put(rec.getKey(), rec);
}
}
List<WriteStatus> statuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should match specified time",
commitTime, deltaCommit.get().getTimestamp());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
}
private FileStatus[] getROIncrementalFiles(String partitionPath, boolean stopAtCompaction)
throws Exception {
return getROIncrementalFiles(partitionPath, "000", -1, stopAtCompaction);
}
private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception {
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, basePath + "/" + partitionPath);
return roInputFormat.listStatus(roJobConf);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath)
throws Exception {
return getRTIncrementalFiles(partitionPath, "000", -1);
}
private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception {
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, basePath + "/" + partitionPath);
return rtInputFormat.listStatus(rtJobConf);
}
private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull, boolean stopAtCompaction) {
String modePropertyName =
String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtil.INCREMENTAL_SCAN_MODE);
String startCommitTimestampName =
String.format(HoodieHiveUtil.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(startCommitTimestampName, startCommit);
String maxCommitPulls =
String.format(HoodieHiveUtil.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
String stopAtCompactionPropName =
String.format(HoodieHiveUtil.HOODIE_STOP_AT_COMPACTION_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setBoolean(stopAtCompactionPropName, stopAtCompaction);
}
private void validateIncrementalFiles(String partitionPath, int expectedNumFiles,
FileStatus[] files, HoodieParquetInputFormat inputFormat,
JobConf jobConf, int expectedRecords, String... expectedCommits) {
assertEquals(expectedNumFiles, files.length);
Set<String> expectedCommitsSet = Arrays.asList(expectedCommits).stream().collect(Collectors.toSet());
List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Arrays.asList(basePath + "/" + partitionPath), basePath, jobConf, inputFormat);
assertEquals(expectedRecords, records.size());
Set<String> actualCommits = records.stream().map(r ->
r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()).collect(Collectors.toSet());
assertEquals(expectedCommitsSet, actualCommits);
}
}

View File

@@ -112,7 +112,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
}
@Override
public HoodieTimeline getCommitsAndCompactionTimeline() {
public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION);
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
}
@@ -136,6 +136,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
details);
}
@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(instants.stream()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, LESSER)),
details);
}
@Override
public HoodieTimeline filter(Predicate<HoodieInstant> filter) {
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);

View File

@@ -143,6 +143,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsAfter(String instantTime, int numCommits);
/**
* Create a new Timeline with all instants before specified time.
*/
HoodieTimeline findInstantsBefore(String instantTime);
/**
* Custom Filter of Instants.
*/

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.timeline;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -25,8 +26,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -153,10 +152,15 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
public void testTimelineOperations() {
timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"),
Stream.of("21", "23"));
HoodieTestUtils.assertStreamEquals("", Stream.of("05", "07", "09", "11"), timeline.getCommitTimeline()
.filterCompletedInstants().findInstantsInRange("04", "11").getInstants().map(HoodieInstant::getTimestamp));
HoodieTestUtils.assertStreamEquals("", Stream.of("09", "11"), timeline.getCommitTimeline().filterCompletedInstants()
.findInstantsAfter("07", 2).getInstants().map(HoodieInstant::getTimestamp));
HoodieTestUtils.assertStreamEquals("findInstantsInRange should return 4 instants", Stream.of("05", "07", "09", "11"),
timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11")
.getInstants().map(HoodieInstant::getTimestamp));
HoodieTestUtils.assertStreamEquals("findInstantsAfter 07 should return 2 instants", Stream.of("09", "11"),
timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2)
.getInstants().map(HoodieInstant::getTimestamp));
HoodieTestUtils.assertStreamEquals("findInstantsBefore 07 should return 3 instants", Stream.of("01", "03", "05"),
timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07")
.getInstants().map(HoodieInstant::getTimestamp));
assertFalse(timeline.empty());
assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty());
assertEquals("", 12, timeline.countInstants());

View File

@@ -38,6 +38,23 @@ public class HoodieHiveUtil {
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
public static final String HOODIE_MAX_COMMIT_PATTERN = "hoodie.%s.consume.max.commits";
/*
* Boolean property to stop incremental reader when there is a pending compaction.
* This is needed to prevent certain race conditions with RO views of MOR tables. only applicable for RO views.
*
* example timeline:
*
* t0 -> create bucket1.parquet
* t1 -> create and append updates bucket1.log
* t2 -> request compaction
* t3 -> create bucket2.parquet
*
* if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
*
* To workaround this problem, we want to stop returning data belonging to commits > t2.
* After compaction is complete, incremental reader would see updates in t2, t3, so on.
*/
public static final String HOODIE_STOP_AT_COMPACTION_PATTERN = "hoodie.%s.ro.stop.at.compaction";
public static final String INCREMENTAL_SCAN_MODE = "INCREMENTAL";
public static final String SNAPSHOT_SCAN_MODE = "SNAPSHOT";
public static final String DEFAULT_SCAN_MODE = SNAPSHOT_SCAN_MODE;
@@ -46,6 +63,13 @@ public class HoodieHiveUtil {
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
public static boolean stopAtCompaction(JobContext job, String tableName) {
String compactionPropName = String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName);
boolean stopAtCompaction = job.getConfiguration().getBoolean(compactionPropName, true);
LOG.info("Read stop at compaction - " + stopAtCompaction);
return stopAtCompaction;
}
public static Integer readMaxCommits(JobContext job, String tableName) {
String maxCommitName = String.format(HOODIE_MAX_COMMIT_PATTERN, tableName);
int maxCommits = job.getConfiguration().getInt(maxCommitName, DEFAULT_MAX_COMMITS);

View File

@@ -18,17 +18,6 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -42,6 +31,18 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -118,6 +119,36 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
return returns.toArray(new FileStatus[returns.size()]);
}
/**
* Filter any specific instants that we do not want to process.
* example timeline:
*
* t0 -> create bucket1.parquet
* t1 -> create and append updates bucket1.log
* t2 -> request compaction
* t3 -> create bucket2.parquet
*
* if compaction at t2 takes a long time, incremental readers on RO tables can move to t3 and would skip updates in t1
*
* To workaround this problem, we want to stop returning data belonging to commits > t2.
* After compaction is complete, incremental reader would see updates in t2, t3, so on.
*/
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
HoodieDefaultTimeline commitsAndCompactionTimeline = timeline.getCommitsAndCompactionTimeline();
Option<HoodieInstant> pendingCompactionInstant = commitsAndCompactionTimeline.filterPendingCompactionTimeline().firstInstant();
if (pendingCompactionInstant.isPresent()) {
HoodieDefaultTimeline instantsTimeline = commitsAndCompactionTimeline.findInstantsBefore(pendingCompactionInstant.get().getTimestamp());
int numCommitsFilteredByCompaction = commitsAndCompactionTimeline.getCommitsTimeline().countInstants()
- instantsTimeline.getCommitsTimeline().countInstants();
LOG.info("Earliest pending compaction instant is: " + pendingCompactionInstant.get().getTimestamp()
+ " skipping " + numCommitsFilteredByCompaction + " commits");
return instantsTimeline;
} else {
return timeline;
}
}
/**
* Achieves listStatus functionality for an incrementally queried table. Instead of listing all
* partitions and then filtering based on the commits of interest, this logic first extracts the
@@ -126,10 +157,18 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
private List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
HoodieTimeline timeline = tableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
Job jobContext = Job.getInstance(job);
HoodieDefaultTimeline baseTimeline;
if (HoodieHiveUtil.stopAtCompaction(jobContext, tableName)) {
baseTimeline = filterInstantsTimeline(tableMetaClient.getActiveTimeline());
} else {
baseTimeline = tableMetaClient.getActiveTimeline();
}
HoodieTimeline timeline = baseTimeline.getCommitsTimeline().filterCompletedInstants();
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(jobContext, tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
Integer maxCommits = HoodieHiveUtil.readMaxCommits(jobContext, tableName);
LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
List<HoodieInstant> commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
.getInstants().collect(Collectors.toList());

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -159,6 +160,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
return super.listStatus(job);
}
@Override
protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) {
// no specific filtering for Realtime format
return timeline;
}
/**
* Add a field to the existing fields projected.
*/

View File

@@ -50,8 +50,13 @@ public class InputFormatTestUtil {
basePath.create();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
File partitionPath = basePath.newFolder("2016", "05", "01");
return simulateInserts(partitionPath, "fileId1", numberOfFiles, commitNumber);
}
public static File simulateInserts(File partitionPath, String fileId, int numberOfFiles, String commitNumber)
throws IOException {
for (int i = 0; i < numberOfFiles; i++) {
File dataFile = new File(partitionPath, FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i));
File dataFile = new File(partitionPath, FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, fileId + i));
dataFile.createNewFile();
}
return partitionPath;

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTestUtils;
@@ -32,6 +33,11 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -41,9 +47,11 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestHoodieParquetInputFormat {
@@ -61,6 +69,67 @@ public class TestHoodieParquetInputFormat {
@Rule
public TemporaryFolder basePath = new TemporaryFolder();
// Verify that HoodieParquetInputFormat does not return instants after pending compaction
@Test
public void testPendingCompactionWithActiveCommits() throws IOException {
// setup 4 sample instants in timeline
List<HoodieInstant> instants = new ArrayList<>();
HoodieInstant t1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1");
HoodieInstant t2 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "2");
HoodieInstant t3 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "3");
HoodieInstant t4 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "4");
HoodieInstant t5 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "5");
HoodieInstant t6 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "6");
instants.add(t1);
instants.add(t2);
instants.add(t3);
instants.add(t4);
instants.add(t5);
instants.add(t6);
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getRoot().getAbsolutePath().toString());
HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
// Verify getCommitsTimelineBeforePendingCompaction does not return instants after first compaction instant
HoodieTimeline filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
assertFalse(filteredTimeline.containsInstant(t4));
assertFalse(filteredTimeline.containsInstant(t5));
assertFalse(filteredTimeline.containsInstant(t6));
// remove compaction instant and setup timeline again
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
assertTrue(filteredTimeline.containsInstant(t4));
assertFalse(filteredTimeline.containsInstant(t5));
assertFalse(filteredTimeline.containsInstant(t6));
// remove remaining compaction instant and setup timeline again
instants.remove(t5);
timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(instants);
filteredTimeline = new HoodieParquetInputFormat().filterInstantsTimeline(timeline);
// verify all remaining instants are returned.
assertTrue(filteredTimeline.containsInstant(t1));
assertTrue(filteredTimeline.containsInstant(t2));
assertFalse(filteredTimeline.containsInstant(t3));
assertTrue(filteredTimeline.containsInstant(t4));
assertFalse(filteredTimeline.containsInstant(t5));
assertTrue(filteredTimeline.containsInstant(t6));
}
@Test
public void testInputFormatLoad() throws IOException {
// initial commit
@@ -133,6 +202,22 @@ public class TestHoodieParquetInputFormat {
fileOutputStream.close();
}
private File createCompactionFile(TemporaryFolder basePath, String commitTime)
throws IOException {
File file = new File(basePath.getRoot().toString() + "/.hoodie/",
HoodieTimeline.makeRequestedCompactionFileName(commitTime));
assertTrue(file.createNewFile());
FileOutputStream os = new FileOutputStream(file);
try {
HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build();
// Write empty commit metadata
os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get());
return file;
} finally {
os.close();
}
}
@Test
public void testIncrementalWithMultipleCommits() throws IOException {
// initial commit
@@ -228,6 +313,43 @@ public class TestHoodieParquetInputFormat {
}
}
// test incremental read does not go past compaction instant for RO views
@Test
public void testIncrementalWithPendingCompaction() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, 10, "100");
createCommitFile(basePath, "100", "2016/05/01");
// simulate compaction requested at 300
File compactionFile = createCompactionFile(basePath, "300");
// write inserts into new bucket
InputFormatTestUtil.simulateInserts(partitionDir, "fileId2", 10, "400");
createCommitFile(basePath, "400", "2016/05/01");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals("Pulling all commit from beginning, should not return instants after begin compaction",
10, files.length);
ensureFilesInCommit("Pulling all commit from beginning, should not return instants after begin compaction",
files, "100", 10);
// delete compaction and verify inserts show up
compactionFile.delete();
InputFormatTestUtil.setupIncremental(jobConf, "0", -1);
files = inputFormat.listStatus(jobConf);
assertEquals("after deleting compaction, should get all inserted files",
20, files.length);
ensureFilesInCommit("Pulling all commit from beginning, should return instants before requested compaction",
files, "100", 10);
ensureFilesInCommit("Pulling all commit from beginning, should return instants after requested compaction",
files, "400", 10);
}
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;