[HUDI-1817] Fix getting incorrect partition path while using incr query by spark-sql (#2858)
This commit is contained in:
@@ -239,7 +239,7 @@ public class HoodieInputFormatUtils {
|
|||||||
* those partitions.
|
* those partitions.
|
||||||
*/
|
*/
|
||||||
for (Path path : inputPaths) {
|
for (Path path : inputPaths) {
|
||||||
if (path.toString().contains(s)) {
|
if (path.toString().endsWith(s)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ import java.io.File;
|
|||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@@ -241,6 +242,38 @@ public class TestHoodieParquetInputFormat {
|
|||||||
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
|
"We should exclude commit 100 when returning incremental pull with start commit time as 100");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiPartitionTableIncremental() throws IOException {
|
||||||
|
// initial commit
|
||||||
|
java.nio.file.Path tablePath = Paths.get(basePath.toString(), "raw_trips");
|
||||||
|
|
||||||
|
// create hudi table and insert data to it
|
||||||
|
// create only one file
|
||||||
|
File partitionDir1 = InputFormatTestUtil
|
||||||
|
.prepareMultiPartitionTable(basePath, baseFileFormat, 1, "100", "1");
|
||||||
|
createCommitFile(basePath, "100", "2016/05/1");
|
||||||
|
|
||||||
|
// insert new data to partition "2016/05/11"
|
||||||
|
// create only one file
|
||||||
|
File partitionDir2 = InputFormatTestUtil
|
||||||
|
.prepareMultiPartitionTable(basePath, baseFileFormat, 1, "100", "11");
|
||||||
|
createCommitFile(basePath, "101", "2016/05/11");
|
||||||
|
|
||||||
|
|
||||||
|
// now partitionDir2.getPath().contain(partitionDir1.getPath()), and hudi-1817 will occur
|
||||||
|
assertEquals(true, partitionDir2.getPath().contains(partitionDir1.getPath()));
|
||||||
|
|
||||||
|
// set partitionDir2 to be the inputPaths of current inputFormat
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDir2.getPath());
|
||||||
|
|
||||||
|
// set incremental startCommit=0 and numberOfCommitsToPull=3 to pull all the data from hudi table
|
||||||
|
InputFormatTestUtil.setupIncremental(jobConf, "0", 3);
|
||||||
|
|
||||||
|
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||||
|
assertEquals(1, files.length,
|
||||||
|
"We should get one file from partition: " + partitionDir2.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIncrementalEmptyPartitions() throws IOException {
|
public void testIncrementalEmptyPartitions() throws IOException {
|
||||||
// initial commit
|
// initial commit
|
||||||
|
|||||||
@@ -72,6 +72,17 @@ public class InputFormatTestUtil {
|
|||||||
commitNumber);
|
commitNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
||||||
|
String commitNumber, String finalLevelPartitionName)
|
||||||
|
throws IOException {
|
||||||
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
|
||||||
|
baseFileFormat);
|
||||||
|
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", finalLevelPartitionName));
|
||||||
|
Files.createDirectories(partitionPath);
|
||||||
|
return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1" + finalLevelPartitionName, numberOfFiles,
|
||||||
|
commitNumber);
|
||||||
|
}
|
||||||
|
|
||||||
public static File simulateInserts(File partitionPath, String baseFileExtension, String fileId, int numberOfFiles,
|
public static File simulateInserts(File partitionPath, String baseFileExtension, String fileId, int numberOfFiles,
|
||||||
String commitNumber)
|
String commitNumber)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|||||||
Reference in New Issue
Block a user