[HUDI-3454] Fix partition name in all code paths for LogRecordScanner (#5252)
* Depend on FSUtils#getRelativePartitionPath(basePath, logFilePath.getParent) to get the partition. * If the list of log file paths in the split is empty, then fallback to usual behaviour.
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.table.format;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieOperation;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
@@ -43,6 +42,7 @@ import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.types.RowKind;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -52,6 +52,10 @@ import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.apache.hudi.common.fs.FSUtils.getFs;
|
||||
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
|
||||
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
|
||||
|
||||
/**
|
||||
* Utilities for format.
|
||||
*/
|
||||
@@ -124,11 +128,13 @@ public class FormatUtils {
|
||||
Schema logSchema,
|
||||
Configuration config,
|
||||
boolean withOperationField) {
|
||||
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
|
||||
return HoodieMergedLogRecordScanner.newBuilder()
|
||||
String basePath = split.getTablePath();
|
||||
List<String> logPaths = split.getLogPaths().get();
|
||||
FileSystem fs = getFs(basePath, config);
|
||||
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(fs)
|
||||
.withBasePath(split.getTablePath())
|
||||
.withLogFilePaths(split.getLogPaths().get())
|
||||
.withBasePath(basePath)
|
||||
.withLogFilePaths(logPaths)
|
||||
.withReaderSchema(logSchema)
|
||||
.withLatestInstantTime(split.getLatestCommit())
|
||||
.withReadBlocksLazily(
|
||||
@@ -144,8 +150,12 @@ public class FormatUtils {
|
||||
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
|
||||
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
|
||||
.withInstantRange(split.getInstantRange())
|
||||
.withOperationField(withOperationField)
|
||||
.build();
|
||||
.withOperationField(withOperationField);
|
||||
if (!isNullOrEmpty(logPaths)) {
|
||||
logRecordScannerBuilder
|
||||
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
|
||||
}
|
||||
return logRecordScannerBuilder.build();
|
||||
}
|
||||
|
||||
private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
|
||||
@@ -153,7 +163,7 @@ public class FormatUtils {
|
||||
Schema logSchema,
|
||||
Configuration config,
|
||||
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
|
||||
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
|
||||
FileSystem fs = getFs(split.getTablePath(), config);
|
||||
return HoodieUnMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(fs)
|
||||
.withBasePath(split.getTablePath())
|
||||
@@ -234,8 +244,8 @@ public class FormatUtils {
|
||||
HoodieWriteConfig writeConfig,
|
||||
Configuration hadoopConf) {
|
||||
String basePath = writeConfig.getBasePath();
|
||||
return HoodieMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(FSUtils.getFs(basePath, hadoopConf))
|
||||
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
|
||||
.withFileSystem(getFs(basePath, hadoopConf))
|
||||
.withBasePath(basePath)
|
||||
.withLogFilePaths(logPaths)
|
||||
.withReaderSchema(logSchema)
|
||||
@@ -246,8 +256,12 @@ public class FormatUtils {
|
||||
.withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
|
||||
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
||||
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
|
||||
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||
.build();
|
||||
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
|
||||
if (!isNullOrEmpty(logPaths)) {
|
||||
logRecordScannerBuilder
|
||||
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
|
||||
}
|
||||
return logRecordScannerBuilder.build();
|
||||
}
|
||||
|
||||
private static Boolean string2Boolean(String s) {
|
||||
|
||||
Reference in New Issue
Block a user