[HUDI-3834] Fixing performance hits in reading Column Stats Index (#5266)
Fixing performance hits in reading Column Stats Index: [HUDI-3834] There's substantial performance degradation in Avro 1.10 default generated Builder classes: they by default rely on SpecificData.getForSchema that load corresponding model's class using reflection, which takes a hit when executed on the hot-path (this was bringing overall runtime to read full Column Stats Index of 800k records to 60s, whereas now it's taking mere 3s) Addressing memory churn by over-used Hadoop's Path creation: Path ctor is not a lightweight sequence and produces quite a bit of memory churn adding pressure on GC. Cleaning such avoidable allocations up to make sure there's no unnecessarily added pressure on GC.
This commit is contained in:
@@ -27,7 +27,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
@@ -37,7 +36,6 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -61,21 +59,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
*/
|
||||
public class ITTestClusteringCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
private String tablePath;
|
||||
private String tableName;
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException {
|
||||
tableName = "test_table_" + ITTestClusteringCommand.class.getName();
|
||||
tablePath = Paths.get(basePath, tableName).toString();
|
||||
basePath = Paths.get(basePath, tableName).toString();
|
||||
|
||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
||||
// Create table and connect
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient.setBasePath(tablePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -168,7 +163,7 @@ public class ITTestClusteringCommand extends AbstractShellIntegrationTest {
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
|
||||
// Create the write client to write some records in
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withDeleteParallelism(2).forTable(tableName)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
||||
|
||||
@@ -54,16 +54,16 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException {
|
||||
String tableName = "test_table_" + ITTestCommitsCommand.class.getName();
|
||||
String tablePath = Paths.get(basePath, tableName).toString();
|
||||
tableName = "test_table_" + ITTestCommitsCommand.class.getName();
|
||||
basePath = Paths.get(basePath, tableName).toString();
|
||||
|
||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
||||
// Create table and connect
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient.setBasePath(tablePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
@@ -48,7 +47,6 @@ import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.shell.core.CommandResult;
|
||||
@@ -73,21 +71,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
*/
|
||||
public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
private String tablePath;
|
||||
private String tableName;
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws IOException {
|
||||
tableName = "test_table_" + ITTestCompactionCommand.class.getName();
|
||||
tablePath = Paths.get(basePath, tableName).toString();
|
||||
basePath = Paths.get(basePath, tableName).toString();
|
||||
|
||||
HoodieCLI.conf = jsc.hadoopConfiguration();
|
||||
// Create table and connect
|
||||
new TableCommand().createTable(
|
||||
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||
basePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
|
||||
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||
metaClient.setBasePath(tablePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -298,7 +293,7 @@ public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
|
||||
// Create the write client to write some records in
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.withDeleteParallelism(2).forTable(tableName)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
||||
|
||||
@@ -96,22 +96,26 @@ public class FSUtils {
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static FileSystem getFs(String path, Configuration conf) {
|
||||
public static FileSystem getFs(String pathStr, Configuration conf) {
|
||||
return getFs(new Path(pathStr), conf);
|
||||
}
|
||||
|
||||
public static FileSystem getFs(Path path, Configuration conf) {
|
||||
FileSystem fs;
|
||||
prepareHadoopConf(conf);
|
||||
try {
|
||||
fs = new Path(path).getFileSystem(conf);
|
||||
fs = path.getFileSystem(conf);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e);
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) {
|
||||
public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault) {
|
||||
if (localByDefault) {
|
||||
return getFs(addSchemeIfLocalPath(path).toString(), conf);
|
||||
return getFs(addSchemeIfLocalPath(pathStr), conf);
|
||||
}
|
||||
return getFs(path, conf);
|
||||
return getFs(pathStr, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -178,7 +182,7 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
public static String getCommitTime(String fullFileName) {
|
||||
if (isLogFile(new Path(fullFileName))) {
|
||||
if (isLogFile(fullFileName)) {
|
||||
return fullFileName.split("_")[1].split("\\.")[0];
|
||||
}
|
||||
return fullFileName.split("_")[2].split("\\.")[0];
|
||||
@@ -461,8 +465,12 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
public static boolean isLogFile(Path logPath) {
|
||||
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
|
||||
return matcher.find() && logPath.getName().contains(".log");
|
||||
return isLogFile(logPath.getName());
|
||||
}
|
||||
|
||||
public static boolean isLogFile(String fileName) {
|
||||
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
|
||||
return matcher.find() && fileName.contains(".log");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -48,7 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hudi.hadoop.FileNameCachingPath;
|
||||
import org.apache.hudi.hadoop.CachingPath;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@@ -142,7 +142,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
|
||||
try {
|
||||
newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(),
|
||||
oldURI.getQuery(), oldURI.getFragment());
|
||||
return new FileNameCachingPath(newURI);
|
||||
return new CachingPath(newURI);
|
||||
} catch (URISyntaxException e) {
|
||||
// TODO - Better Exception handling
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@@ -31,26 +31,35 @@ import java.util.Objects;
|
||||
public class BaseFile implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private transient FileStatus fileStatus;
|
||||
private final String fullPath;
|
||||
private final String fileName;
|
||||
private long fileLen;
|
||||
|
||||
public BaseFile(BaseFile dataFile) {
|
||||
this.fileStatus = dataFile.fileStatus;
|
||||
this.fullPath = dataFile.fullPath;
|
||||
this.fileLen = dataFile.fileLen;
|
||||
this(dataFile.fileStatus,
|
||||
dataFile.fullPath,
|
||||
dataFile.getFileName(),
|
||||
dataFile.getFileLen());
|
||||
}
|
||||
|
||||
public BaseFile(FileStatus fileStatus) {
|
||||
this.fileStatus = fileStatus;
|
||||
this.fullPath = fileStatus.getPath().toString();
|
||||
this.fileLen = fileStatus.getLen();
|
||||
this(fileStatus,
|
||||
fileStatus.getPath().toString(),
|
||||
fileStatus.getPath().getName(),
|
||||
fileStatus.getLen());
|
||||
}
|
||||
|
||||
public BaseFile(String filePath) {
|
||||
this.fileStatus = null;
|
||||
this.fullPath = filePath;
|
||||
this.fileLen = -1;
|
||||
this(null, filePath, getFileName(filePath), -1);
|
||||
}
|
||||
|
||||
private BaseFile(FileStatus fileStatus, String fullPath, String fileName, long fileLen) {
|
||||
this.fileStatus = fileStatus;
|
||||
this.fullPath = fullPath;
|
||||
this.fileLen = fileLen;
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
@@ -58,7 +67,7 @@ public class BaseFile implements Serializable {
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return new Path(fullPath).getName();
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public FileStatus getFileStatus() {
|
||||
@@ -98,4 +107,8 @@ public class BaseFile implements Serializable {
|
||||
public String toString() {
|
||||
return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}';
|
||||
}
|
||||
|
||||
private static String getFileName(String fullPath) {
|
||||
return new Path(fullPath).getName();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hudi.hadoop.CachingPath;
|
||||
import org.apache.hudi.hadoop.SerializablePath;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -84,7 +86,6 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
|
||||
public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
|
||||
public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata";
|
||||
public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex";
|
||||
public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH
|
||||
+ Path.SEPARATOR + ".partitions";
|
||||
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR
|
||||
@@ -94,9 +95,13 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
|
||||
public static final String MARKER_EXTN = ".marker";
|
||||
|
||||
private String basePath;
|
||||
// NOTE: Since those two parameters lay on the hot-path of a lot of computations, we
|
||||
// use tailored extension of the {@code Path} class allowing to avoid repetitive
|
||||
// computations secured by its immutability
|
||||
private SerializablePath basePath;
|
||||
private SerializablePath metaPath;
|
||||
|
||||
private transient HoodieWrapperFileSystem fs;
|
||||
private String metaPath;
|
||||
private boolean loadActiveTimelineOnLoad;
|
||||
private SerializableConfiguration hadoopConf;
|
||||
private HoodieTableType tableType;
|
||||
@@ -114,13 +119,11 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
this.consistencyGuardConfig = consistencyGuardConfig;
|
||||
this.fileSystemRetryConfig = fileSystemRetryConfig;
|
||||
this.hadoopConf = new SerializableConfiguration(conf);
|
||||
Path basePathDir = new Path(basePath);
|
||||
this.basePath = basePathDir.toString();
|
||||
this.metaPath = new Path(basePath, METAFOLDER_NAME).toString();
|
||||
Path metaPathDir = new Path(this.metaPath);
|
||||
this.basePath = new SerializablePath(new CachingPath(basePath));
|
||||
this.metaPath = new SerializablePath(new CachingPath(basePath, METAFOLDER_NAME));
|
||||
this.fs = getFs();
|
||||
TableNotFoundException.checkTableValidity(fs, basePathDir, metaPathDir);
|
||||
this.tableConfig = new HoodieTableConfig(fs, metaPath, payloadClassName);
|
||||
TableNotFoundException.checkTableValidity(fs, this.basePath.get(), metaPath.get());
|
||||
this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName);
|
||||
this.tableType = tableConfig.getTableType();
|
||||
Option<TimelineLayoutVersion> tableConfigVersion = tableConfig.getTimelineLayoutVersion();
|
||||
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
|
||||
@@ -147,8 +150,13 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
public HoodieTableMetaClient() {}
|
||||
|
||||
public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
|
||||
return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
|
||||
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null)
|
||||
return HoodieTableMetaClient.builder()
|
||||
.setConf(oldMetaClient.hadoopConf.get())
|
||||
.setBasePath(oldMetaClient.basePath.toString())
|
||||
.setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
|
||||
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig)
|
||||
.setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion))
|
||||
.setPayloadClassName(null)
|
||||
.setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
|
||||
}
|
||||
|
||||
@@ -159,6 +167,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
*/
|
||||
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||
in.defaultReadObject();
|
||||
|
||||
fs = null; // will be lazily initialized
|
||||
}
|
||||
|
||||
@@ -167,10 +176,19 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Base path
|
||||
* Returns base path of the table
|
||||
*/
|
||||
public Path getBasePathV2() {
|
||||
return basePath.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Base path
|
||||
* @deprecated please use {@link #getBasePathV2()}
|
||||
*/
|
||||
@Deprecated
|
||||
public String getBasePath() {
|
||||
return basePath;
|
||||
return basePath.get().toString(); // this invocation is cached
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -184,21 +202,14 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
* @return Meta path
|
||||
*/
|
||||
public String getMetaPath() {
|
||||
return metaPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Column Statistics index path
|
||||
*/
|
||||
public String getColumnStatsIndexPath() {
|
||||
return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString();
|
||||
return metaPath.get().toString(); // this invocation is cached
|
||||
}
|
||||
|
||||
/**
|
||||
* @return schema folder path
|
||||
*/
|
||||
public String getSchemaFolderName() {
|
||||
return new Path(metaPath, SCHEMA_FOLDER_NAME).toString();
|
||||
return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -270,7 +281,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
*/
|
||||
public HoodieWrapperFileSystem getFs() {
|
||||
if (fs == null) {
|
||||
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
|
||||
FileSystem fileSystem = FSUtils.getFs(metaPath.get(), hadoopConf.newCopy());
|
||||
|
||||
if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
|
||||
fileSystem = new HoodieRetryWrapperFileSystem(fileSystem,
|
||||
@@ -437,8 +448,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
return metaClient;
|
||||
}
|
||||
|
||||
public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf,
|
||||
String basePath, FileSystem fs) throws IOException {
|
||||
public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, String basePath, FileSystem fs) throws IOException {
|
||||
|
||||
// Create bootstrap index by partition folder if it does not exist
|
||||
final Path bootstrap_index_folder_by_partition =
|
||||
@@ -542,7 +552,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
*/
|
||||
public List<HoodieInstant> scanHoodieInstantsFromFileSystem(Set<String> includedExtensions,
|
||||
boolean applyLayoutVersionFilters) throws IOException {
|
||||
return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters);
|
||||
return scanHoodieInstantsFromFileSystem(metaPath.get(), includedExtensions, applyLayoutVersionFilters);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -599,19 +609,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
}
|
||||
|
||||
public void initializeBootstrapDirsIfNotExists() throws IOException {
|
||||
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath, getFs());
|
||||
}
|
||||
|
||||
public void setBasePath(String basePath) {
|
||||
this.basePath = basePath;
|
||||
}
|
||||
|
||||
public void setMetaPath(String metaPath) {
|
||||
this.metaPath = metaPath;
|
||||
}
|
||||
|
||||
public void setActiveTimeline(HoodieActiveTimeline activeTimeline) {
|
||||
this.activeTimeline = activeTimeline;
|
||||
initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs());
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
|
||||
@@ -95,7 +95,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
private BootstrapIndex bootstrapIndex;
|
||||
|
||||
private String getPartitionPathFromFilePath(String fullPath) {
|
||||
return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent());
|
||||
return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -172,7 +172,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
|
||||
Map<Pair<String, String>, List<HoodieLogFile>> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> {
|
||||
String partitionPathStr =
|
||||
FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent());
|
||||
FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), logFile.getPath().getParent());
|
||||
return Pair.of(partitionPathStr, logFile.getFileId());
|
||||
}));
|
||||
|
||||
@@ -299,7 +299,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
try {
|
||||
LOG.info("Building file system view for partition (" + partitionPathStr + ")");
|
||||
|
||||
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
|
||||
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
|
||||
long beginLsTs = System.currentTimeMillis();
|
||||
FileStatus[] statuses = listPartition(partitionPath);
|
||||
long endLsTs = System.currentTimeMillis();
|
||||
|
||||
@@ -20,19 +20,47 @@ package org.apache.hudi.hadoop;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
import java.io.Serializable;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* This is an extension of the {@code Path} class allowing to avoid repetitive
|
||||
* computations (like {@code getFileName}, {@code toString}) which are secured
|
||||
* by its immutability
|
||||
*
|
||||
* NOTE: This class is thread-safe
|
||||
*/
|
||||
public class FileNameCachingPath extends Path {
|
||||
@ThreadSafe
|
||||
public class CachingPath extends Path implements Serializable {
|
||||
|
||||
// NOTE: volatile keyword is redundant here and put mostly for reader notice, since all
|
||||
// NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all
|
||||
// reads/writes to references are always atomic (including 64-bit JVMs)
|
||||
// https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7
|
||||
private volatile String fileName;
|
||||
private volatile String fullPathStr;
|
||||
|
||||
public FileNameCachingPath(URI aUri) {
|
||||
public CachingPath(String parent, String child) {
|
||||
super(parent, child);
|
||||
}
|
||||
|
||||
public CachingPath(Path parent, String child) {
|
||||
super(parent, child);
|
||||
}
|
||||
|
||||
public CachingPath(String parent, Path child) {
|
||||
super(parent, child);
|
||||
}
|
||||
|
||||
public CachingPath(Path parent, Path child) {
|
||||
super(parent, child);
|
||||
}
|
||||
|
||||
public CachingPath(String pathString) throws IllegalArgumentException {
|
||||
super(pathString);
|
||||
}
|
||||
|
||||
public CachingPath(URI aUri) {
|
||||
super(aUri);
|
||||
}
|
||||
|
||||
@@ -45,4 +73,14 @@ public class FileNameCachingPath extends Path {
|
||||
}
|
||||
return fileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// This value could be overwritten concurrently and that's okay, since
|
||||
// {@code Path} is immutable
|
||||
if (fullPathStr == null) {
|
||||
fullPathStr = super.toString();
|
||||
}
|
||||
return fullPathStr;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hadoop;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* {@link Serializable} wrapper encapsulating {@link Path}
|
||||
*/
|
||||
public class SerializablePath implements Serializable {
|
||||
|
||||
private Path path;
|
||||
|
||||
public SerializablePath(Path path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public Path get() {
|
||||
return path;
|
||||
}
|
||||
|
||||
private void writeObject(ObjectOutputStream out) throws IOException {
|
||||
out.writeUTF(path.toString());
|
||||
}
|
||||
|
||||
private void readObject(ObjectInputStream in) throws IOException {
|
||||
String pathStr = in.readUTF();
|
||||
path = new CachingPath(pathStr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return path.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
SerializablePath that = (SerializablePath) o;
|
||||
return Objects.equals(path, that.path);
|
||||
}
|
||||
}
|
||||
@@ -54,6 +54,7 @@ import org.apache.hudi.common.util.hash.FileIndexID;
|
||||
import org.apache.hudi.common.util.hash.PartitionIndexID;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
import org.apache.hudi.io.storage.HoodieHFileReader;
|
||||
import org.apache.hudi.util.Lazy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
@@ -143,6 +144,31 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
|
||||
private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = new Conversions.DecimalConversion();
|
||||
|
||||
// NOTE: PLEASE READ CAREFULLY
|
||||
//
|
||||
// In Avro 1.10 generated builders rely on {@code SpecificData.getForSchema} invocation that in turn
|
||||
// does use reflection to load the code-gen'd class corresponding to the Avro record model. This has
|
||||
// serious adverse effects in terms of performance when gets executed on the hot-path (both, in terms
|
||||
// of runtime and efficiency).
|
||||
//
|
||||
// To work this around instead of using default code-gen'd builder invoking {@code SpecificData.getForSchema},
|
||||
// we instead rely on overloaded ctor accepting another instance of the builder: {@code Builder(Builder)},
|
||||
// which bypasses such invocation. Following corresponding builder's stubs are statically initialized
|
||||
// to be used exactly for that purpose.
|
||||
//
|
||||
// You can find more details in HUDI-3834
|
||||
private static final Lazy<HoodieMetadataColumnStats.Builder> METADATA_COLUMN_STATS_BUILDER_STUB = Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
|
||||
private static final Lazy<StringWrapper.Builder> STRING_WRAPPER_BUILDER_STUB = Lazy.lazily(StringWrapper::newBuilder);
|
||||
private static final Lazy<BytesWrapper.Builder> BYTES_WRAPPER_BUILDER_STUB = Lazy.lazily(BytesWrapper::newBuilder);
|
||||
private static final Lazy<DoubleWrapper.Builder> DOUBLE_WRAPPER_BUILDER_STUB = Lazy.lazily(DoubleWrapper::newBuilder);
|
||||
private static final Lazy<FloatWrapper.Builder> FLOAT_WRAPPER_BUILDER_STUB = Lazy.lazily(FloatWrapper::newBuilder);
|
||||
private static final Lazy<LongWrapper.Builder> LONG_WRAPPER_BUILDER_STUB = Lazy.lazily(LongWrapper::newBuilder);
|
||||
private static final Lazy<IntWrapper.Builder> INT_WRAPPER_BUILDER_STUB = Lazy.lazily(IntWrapper::newBuilder);
|
||||
private static final Lazy<BooleanWrapper.Builder> BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder);
|
||||
private static final Lazy<TimestampMicrosWrapper.Builder> TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB = Lazy.lazily(TimestampMicrosWrapper::newBuilder);
|
||||
private static final Lazy<DecimalWrapper.Builder> DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder);
|
||||
private static final Lazy<DateWrapper.Builder> DATE_WRAPPER_BUILDER_STUB = Lazy.lazily(DateWrapper::newBuilder);
|
||||
|
||||
private String key = null;
|
||||
private int type = 0;
|
||||
private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
|
||||
@@ -201,7 +227,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null,
|
||||
String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS));
|
||||
} else {
|
||||
columnStatMetadata = HoodieMetadataColumnStats.newBuilder()
|
||||
columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
|
||||
.setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
|
||||
.setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
|
||||
.setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))
|
||||
@@ -605,7 +631,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
.max(Comparator.naturalOrder())
|
||||
.orElse(null);
|
||||
|
||||
return HoodieMetadataColumnStats.newBuilder()
|
||||
return HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
|
||||
.setFileName(newColumnStats.getFileName())
|
||||
.setColumnName(newColumnStats.getColumnName())
|
||||
.setMinValue(wrapStatisticValue(minValue))
|
||||
@@ -653,11 +679,13 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
LocalDate localDate = statValue instanceof LocalDate
|
||||
? (LocalDate) statValue
|
||||
: ((Date) statValue).toLocalDate();
|
||||
return DateWrapper.newBuilder().setValue((int) localDate.toEpochDay()).build();
|
||||
return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB.get())
|
||||
.setValue((int) localDate.toEpochDay())
|
||||
.build();
|
||||
} else if (statValue instanceof BigDecimal) {
|
||||
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
|
||||
BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType());
|
||||
return DecimalWrapper.newBuilder()
|
||||
return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB.get())
|
||||
.setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType()))
|
||||
.build();
|
||||
} else if (statValue instanceof Timestamp) {
|
||||
@@ -665,23 +693,23 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
// rely on logical types to do proper encoding of the native Java types,
|
||||
// and hereby have to encode statistic manually
|
||||
Instant instant = ((Timestamp) statValue).toInstant();
|
||||
return TimestampMicrosWrapper.newBuilder()
|
||||
return TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB.get())
|
||||
.setValue(instantToMicros(instant))
|
||||
.build();
|
||||
} else if (statValue instanceof Boolean) {
|
||||
return BooleanWrapper.newBuilder().setValue((Boolean) statValue).build();
|
||||
return BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB.get()).setValue((Boolean) statValue).build();
|
||||
} else if (statValue instanceof Integer) {
|
||||
return IntWrapper.newBuilder().setValue((Integer) statValue).build();
|
||||
return IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB.get()).setValue((Integer) statValue).build();
|
||||
} else if (statValue instanceof Long) {
|
||||
return LongWrapper.newBuilder().setValue((Long) statValue).build();
|
||||
return LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB.get()).setValue((Long) statValue).build();
|
||||
} else if (statValue instanceof Float) {
|
||||
return FloatWrapper.newBuilder().setValue((Float) statValue).build();
|
||||
return FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB.get()).setValue((Float) statValue).build();
|
||||
} else if (statValue instanceof Double) {
|
||||
return DoubleWrapper.newBuilder().setValue((Double) statValue).build();
|
||||
return DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB.get()).setValue((Double) statValue).build();
|
||||
} else if (statValue instanceof ByteBuffer) {
|
||||
return BytesWrapper.newBuilder().setValue((ByteBuffer) statValue).build();
|
||||
return BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB.get()).setValue((ByteBuffer) statValue).build();
|
||||
} else if (statValue instanceof String || statValue instanceof Utf8) {
|
||||
return StringWrapper.newBuilder().setValue(statValue.toString()).build();
|
||||
return StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB.get()).setValue(statValue.toString()).build();
|
||||
} else {
|
||||
throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass()));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user