[HUDI-1490] Incremental Query should work even when there are partitions that have no incremental changes (#2371)
* Incremental Query should work even when there are partitions that have no incremental changes Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
committed by
GitHub
parent
e807bb895e
commit
3ec9270e8e
@@ -236,7 +236,7 @@ public class HoodieInputFormatUtils {
|
||||
return false;
|
||||
})
|
||||
.collect(Collectors.joining(","));
|
||||
return Option.of(incrementalInputPaths);
|
||||
return StringUtils.isNullOrEmpty(incrementalInputPaths) ? Option.empty() : Option.of(incrementalInputPaths);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -117,7 +117,6 @@ public class TestHoodieParquetInputFormat {
|
||||
assertFalse(filteredTimeline.containsInstant(t5));
|
||||
assertFalse(filteredTimeline.containsInstant(t6));
|
||||
|
||||
|
||||
// remove compaction instant and setup timeline again
|
||||
instants.remove(t3);
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
@@ -239,6 +238,33 @@ public class TestHoodieParquetInputFormat {
|
||||
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalEmptyPartitions() throws IOException {
|
||||
// initial commit
|
||||
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
|
||||
createCommitFile(basePath, "100", "2016/05/01");
|
||||
|
||||
// Add the paths
|
||||
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "000", 1);
|
||||
|
||||
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||
assertEquals(10, files.length,
|
||||
"We should include only 1 commit 100 when returning incremental pull with start commit time as 100");
|
||||
ensureFilesInCommit("Pulling 1 commits from 000, should get us the 10 files from 100 commit", files, "100", 10);
|
||||
|
||||
// Add new commit only to a new partition
|
||||
partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "200");
|
||||
createCommitFile(basePath, "200", "2017/05/01");
|
||||
|
||||
InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
|
||||
files = inputFormat.listStatus(jobConf);
|
||||
|
||||
assertEquals(0, files.length,
|
||||
"We should exclude commit 200 when returning incremental pull with start commit time as 100 as filePaths does not include new partition");
|
||||
}
|
||||
|
||||
private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath)
|
||||
throws IOException {
|
||||
List<HoodieWriteStat> writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1);
|
||||
@@ -355,7 +381,7 @@ public class TestHoodieParquetInputFormat {
|
||||
String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]);
|
||||
conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]);
|
||||
conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE);
|
||||
String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips");
|
||||
conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase());
|
||||
String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");
|
||||
|
||||
Reference in New Issue
Block a user