diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index f95ca6f4c..7467628e0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -239,7 +239,7 @@ public class HoodieInputFormatUtils { * those partitions. */ for (Path path : inputPaths) { - if (path.toString().contains(s)) { + if (path.toString().endsWith(s)) { return true; } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index cb59db978..fb095fb2d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -52,6 +52,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -241,6 +242,38 @@ public class TestHoodieParquetInputFormat { "We should exclude commit 100 when returning incremental pull with start commit time as 100"); } + @Test + public void testMultiPartitionTableIncremental() throws IOException { + // initial commit + java.nio.file.Path tablePath = Paths.get(basePath.toString(), "raw_trips"); + + // create hudi table and insert data to it + // create only one file + File partitionDir1 = InputFormatTestUtil + .prepareMultiPartitionTable(basePath, baseFileFormat, 1, "100", "1"); + createCommitFile(basePath, "100", "2016/05/1"); + + // insert new data to partition "2016/05/11" + // create only one file + File partitionDir2 = InputFormatTestUtil + .prepareMultiPartitionTable(basePath, baseFileFormat, 1, "100", "11"); + createCommitFile(basePath, "101", "2016/05/11"); + + + // now partitionDir2.getPath().contain(partitionDir1.getPath()), and hudi-1817 will occur + assertEquals(true, partitionDir2.getPath().contains(partitionDir1.getPath())); + + // set partitionDir2 to be the inputPaths of current inputFormat + FileInputFormat.setInputPaths(jobConf, partitionDir2.getPath()); + + // set incremental startCommit=0 and numberOfCommitsToPull=3 to pull all the data from hudi table + InputFormatTestUtil.setupIncremental(jobConf, "0", 3); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(1, files.length, + "We should get one file from partition: " + partitionDir2.getPath()); + } + @Test public void testIncrementalEmptyPartitions() throws IOException { // initial commit diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 7a6332eb3..5d3c4469b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -72,6 +72,17 @@ public class InputFormatTestUtil { commitNumber); } + public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, + String commitNumber, String finalLevelPartitionName) + throws IOException { + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", finalLevelPartitionName)); + Files.createDirectories(partitionPath); + return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1" + finalLevelPartitionName, numberOfFiles, + commitNumber); + } + public static File simulateInserts(File partitionPath, String baseFileExtension, String fileId, int numberOfFiles, String commitNumber) throws IOException {