[HUDI-1395] Fix partition path using FSUtils (#2312)
Fixed the logic to get partition path in Copier and Exporter utilities.
This commit is contained in:
@@ -111,7 +111,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
|
|
||||||
// also need to copy over partition metadata
|
// also need to copy over partition metadata
|
||||||
Path partitionMetaFile =
|
Path partitionMetaFile =
|
||||||
new Path(new Path(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
|
new Path(FSUtils.getPartitionPath(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
|
||||||
if (fs1.exists(partitionMetaFile)) {
|
if (fs1.exists(partitionMetaFile)) {
|
||||||
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
|
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
|
||||||
}
|
}
|
||||||
@@ -122,7 +122,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
context.foreach(filesToCopy, tuple -> {
|
context.foreach(filesToCopy, tuple -> {
|
||||||
String partition = tuple._1();
|
String partition = tuple._1();
|
||||||
Path sourceFilePath = new Path(tuple._2());
|
Path sourceFilePath = new Path(tuple._2());
|
||||||
Path toPartitionPath = new Path(outputDir, partition);
|
Path toPartitionPath = FSUtils.getPartitionPath(outputDir, partition);
|
||||||
FileSystem ifs = FSUtils.getFs(baseDir, serConf.newCopy());
|
FileSystem ifs = FSUtils.getFs(baseDir, serConf.newCopy());
|
||||||
|
|
||||||
if (!ifs.exists(toPartitionPath)) {
|
if (!ifs.exists(toPartitionPath)) {
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ public class HoodieSnapshotExporter {
|
|||||||
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
|
dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())));
|
||||||
// also need to copy over partition metadata
|
// also need to copy over partition metadata
|
||||||
Path partitionMetaFile =
|
Path partitionMetaFile =
|
||||||
new Path(new Path(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
|
new Path(FSUtils.getPartitionPath(cfg.sourceBasePath, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
|
||||||
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
|
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
|
||||||
if (fs.exists(partitionMetaFile)) {
|
if (fs.exists(partitionMetaFile)) {
|
||||||
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
|
filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
|
||||||
@@ -219,7 +219,7 @@ public class HoodieSnapshotExporter {
|
|||||||
context.foreach(files, tuple -> {
|
context.foreach(files, tuple -> {
|
||||||
String partition = tuple._1();
|
String partition = tuple._1();
|
||||||
Path sourceFilePath = new Path(tuple._2());
|
Path sourceFilePath = new Path(tuple._2());
|
||||||
Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
|
Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
|
||||||
FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
|
FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
|
||||||
|
|
||||||
if (!fs.exists(toPartitionPath)) {
|
if (!fs.exists(toPartitionPath)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user