From 7a9d48d126b1474f1bb7cfbe8b1c0b0391ef49d2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sun, 10 Apr 2022 10:42:06 -0700 Subject: [PATCH] [HUDI-3834] Fixing performance hits in reading Column Stats Index (#5266) Fixing performance hits in reading Column Stats Index: [HUDI-3834] There's substantial performance degradation in Avro 1.10 default generated Builder classes: they by default rely on SpecificData.getForSchema that load corresponding model's class using reflection, which takes a hit when executed on the hot-path (this was bringing overall runtime to read full Column Stats Index of 800k records to 60s, whereas now it's taking mere 3s) Addressing memory churn by over-used Hadoop's Path creation: Path ctor is not a lightweight sequence and produces quite a bit of memory churn adding pressure on GC. Cleaning such avoidable allocations up to make sure there's no unnecessarily added pressure on GC. --- .../cli/integ/ITTestClusteringCommand.java | 15 ++-- .../hudi/cli/integ/ITTestCommitsCommand.java | 10 +-- .../cli/integ/ITTestCompactionCommand.java | 15 ++-- .../org/apache/hudi/common/fs/FSUtils.java | 24 ++++-- .../common/fs/HoodieWrapperFileSystem.java | 4 +- .../apache/hudi/common/model/BaseFile.java | 33 +++++--- .../common/table/HoodieTableMetaClient.java | 76 +++++++++---------- .../view/AbstractTableFileSystemView.java | 6 +- ...eNameCachingPath.java => CachingPath.java} | 44 ++++++++++- .../apache/hudi/hadoop/SerializablePath.java | 69 +++++++++++++++++ .../hudi/metadata/HoodieMetadataPayload.java | 52 ++++++++++--- 11 files changed, 246 insertions(+), 102 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/hadoop/{FileNameCachingPath.java => CachingPath.java} (54%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java index 17075f9d3..97d3d91fb 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java @@ -27,7 +27,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -37,7 +36,6 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; - import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.BeforeEach; @@ -61,21 +59,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class ITTestClusteringCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestClusteringCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -168,7 +163,7 @@ public class ITTestClusteringCommand extends AbstractShellIntegrationTest { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java index 18f4a387d..fd533be09 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -54,16 +54,16 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest { @BeforeEach public void init() throws IOException { - String tableName = "test_table_" + ITTestCommitsCommand.class.getName(); - String tablePath = Paths.get(basePath, tableName).toString(); + tableName = "test_table_" + ITTestCommitsCommand.class.getName(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java index 4734f45e7..267cee70f 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -48,7 +47,6 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; @@ -73,21 +71,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class ITTestCompactionCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestCompactionCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + basePath, tableName, HoodieTableType.MERGE_ON_READ.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -298,7 +293,7 @@ public class ITTestCompactionCommand extends AbstractShellIntegrationTest { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1bde88d3b..79badb48a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -96,22 +96,26 @@ public class FSUtils { return conf; } - public static FileSystem getFs(String path, Configuration conf) { + public static FileSystem getFs(String pathStr, Configuration conf) { + return getFs(new Path(pathStr), conf); + } + + public static FileSystem getFs(Path path, Configuration conf) { FileSystem fs; prepareHadoopConf(conf); try { - fs = new Path(path).getFileSystem(conf); + fs = path.getFileSystem(conf); } catch (IOException e) { throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e); } return fs; } - public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) { + public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault) { if (localByDefault) { - return getFs(addSchemeIfLocalPath(path).toString(), conf); + return getFs(addSchemeIfLocalPath(pathStr), conf); } - return getFs(path, conf); + return getFs(pathStr, conf); } /** @@ -178,7 +182,7 @@ public class FSUtils { } public static String getCommitTime(String fullFileName) { - if (isLogFile(new Path(fullFileName))) { + if (isLogFile(fullFileName)) { return fullFileName.split("_")[1].split("\\.")[0]; } return fullFileName.split("_")[2].split("\\.")[0]; @@ -461,8 +465,12 @@ public class FSUtils { } public static boolean isLogFile(Path logPath) { - Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - return matcher.find() && logPath.getName().contains(".log"); + return isLogFile(logPath.getName()); + } + + public static boolean isLogFile(String fileName) { + Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); + return matcher.find() && fileName.contains(".log"); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 4bbd94384..a79d1571a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -48,7 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.hudi.hadoop.FileNameCachingPath; +import org.apache.hudi.hadoop.CachingPath; import java.io.IOException; import java.net.URI; @@ -142,7 +142,7 @@ public class HoodieWrapperFileSystem extends FileSystem { try { newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment()); - return new FileNameCachingPath(newURI); + return new CachingPath(newURI); } catch (URISyntaxException e) { // TODO - Better Exception handling throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index f12c207ee..cd35861b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -31,26 +31,35 @@ import java.util.Objects; public class BaseFile implements Serializable { private static final long serialVersionUID = 1L; + private transient FileStatus fileStatus; private final String fullPath; + private final String fileName; private long fileLen; public BaseFile(BaseFile dataFile) { - this.fileStatus = dataFile.fileStatus; - this.fullPath = dataFile.fullPath; - this.fileLen = dataFile.fileLen; + this(dataFile.fileStatus, + dataFile.fullPath, + dataFile.getFileName(), + dataFile.getFileLen()); } public BaseFile(FileStatus fileStatus) { - this.fileStatus = fileStatus; - this.fullPath = fileStatus.getPath().toString(); - this.fileLen = fileStatus.getLen(); + this(fileStatus, + fileStatus.getPath().toString(), + fileStatus.getPath().getName(), + fileStatus.getLen()); } public BaseFile(String filePath) { - this.fileStatus = null; - this.fullPath = filePath; - this.fileLen = -1; + this(null, filePath, getFileName(filePath), -1); + } + + private BaseFile(FileStatus fileStatus, String fullPath, String fileName, long fileLen) { + this.fileStatus = fileStatus; + this.fullPath = fullPath; + this.fileLen = fileLen; + this.fileName = fileName; } public String getPath() { @@ -58,7 +67,7 @@ public class BaseFile implements Serializable { } public String getFileName() { - return new Path(fullPath).getName(); + return fileName; } public FileStatus getFileStatus() { @@ -98,4 +107,8 @@ public class BaseFile implements Serializable { public String toString() { return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}'; } + + private static String getFileName(String fullPath) { + return new Path(fullPath).getName(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 38b5509cd..23623976d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -84,7 +86,6 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata"; - public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -94,9 +95,13 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; - private String basePath; + // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we + // use tailored extension of the {@code Path} class allowing to avoid repetitive + // computations secured by its immutability + private SerializablePath basePath; + private SerializablePath metaPath; + private transient HoodieWrapperFileSystem fs; - private String metaPath; private boolean loadActiveTimelineOnLoad; private SerializableConfiguration hadoopConf; private HoodieTableType tableType; @@ -114,13 +119,11 @@ public class HoodieTableMetaClient implements Serializable { this.consistencyGuardConfig = consistencyGuardConfig; this.fileSystemRetryConfig = fileSystemRetryConfig; this.hadoopConf = new SerializableConfiguration(conf); - Path basePathDir = new Path(basePath); - this.basePath = basePathDir.toString(); - this.metaPath = new Path(basePath, METAFOLDER_NAME).toString(); - Path metaPathDir = new Path(this.metaPath); + this.basePath = new SerializablePath(new CachingPath(basePath)); + this.metaPath = new SerializablePath(new CachingPath(basePath, METAFOLDER_NAME)); this.fs = getFs(); - TableNotFoundException.checkTableValidity(fs, basePathDir, metaPathDir); - this.tableConfig = new HoodieTableConfig(fs, metaPath, payloadClassName); + TableNotFoundException.checkTableValidity(fs, this.basePath.get(), metaPath.get()); + this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName); this.tableType = tableConfig.getTableType(); Option tableConfigVersion = tableConfig.getTimelineLayoutVersion(); if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { @@ -147,8 +150,13 @@ public class HoodieTableMetaClient implements Serializable { public HoodieTableMetaClient() {} public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { - return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) - .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null) + return HoodieTableMetaClient.builder() + .setConf(oldMetaClient.hadoopConf.get()) + .setBasePath(oldMetaClient.basePath.toString()) + .setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) + .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig) + .setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)) + .setPayloadClassName(null) .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build(); } @@ -159,6 +167,7 @@ public class HoodieTableMetaClient implements Serializable { */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); + fs = null; // will be lazily initialized } @@ -167,10 +176,19 @@ public class HoodieTableMetaClient implements Serializable { } /** - * @return Base path + * Returns base path of the table */ + public Path getBasePathV2() { + return basePath.get(); + } + + /** + * @return Base path + * @deprecated please use {@link #getBasePathV2()} + */ + @Deprecated public String getBasePath() { - return basePath; + return basePath.get().toString(); // this invocation is cached } /** @@ -184,21 +202,14 @@ public class HoodieTableMetaClient implements Serializable { * @return Meta path */ public String getMetaPath() { - return metaPath; - } - - /** - * @return Column Statistics index path - */ - public String getColumnStatsIndexPath() { - return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString(); + return metaPath.get().toString(); // this invocation is cached } /** * @return schema folder path */ public String getSchemaFolderName() { - return new Path(metaPath, SCHEMA_FOLDER_NAME).toString(); + return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString(); } /** @@ -270,7 +281,7 @@ public class HoodieTableMetaClient implements Serializable { */ public HoodieWrapperFileSystem getFs() { if (fs == null) { - FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy()); + FileSystem fileSystem = FSUtils.getFs(metaPath.get(), hadoopConf.newCopy()); if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) { fileSystem = new HoodieRetryWrapperFileSystem(fileSystem, @@ -437,8 +448,7 @@ public class HoodieTableMetaClient implements Serializable { return metaClient; } - public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, - String basePath, FileSystem fs) throws IOException { + public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, String basePath, FileSystem fs) throws IOException { // Create bootstrap index by partition folder if it does not exist final Path bootstrap_index_folder_by_partition = @@ -542,7 +552,7 @@ public class HoodieTableMetaClient implements Serializable { */ public List scanHoodieInstantsFromFileSystem(Set includedExtensions, boolean applyLayoutVersionFilters) throws IOException { - return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters); + return scanHoodieInstantsFromFileSystem(metaPath.get(), includedExtensions, applyLayoutVersionFilters); } /** @@ -599,19 +609,7 @@ public class HoodieTableMetaClient implements Serializable { } public void initializeBootstrapDirsIfNotExists() throws IOException { - initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath, getFs()); - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setMetaPath(String metaPath) { - this.metaPath = metaPath; - } - - public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { - this.activeTimeline = activeTimeline; + initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs()); } public static Builder builder() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 208d7ef2b..c6e618ac4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -95,7 +95,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV private BootstrapIndex bootstrapIndex; private String getPartitionPathFromFilePath(String fullPath) { - return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent()); + return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent()); } /** @@ -172,7 +172,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV Map, List> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> { String partitionPathStr = - FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent()); + FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), logFile.getPath().getParent()); return Pair.of(partitionPathStr, logFile.getFileId()); })); @@ -299,7 +299,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV try { LOG.info("Building file system view for partition (" + partitionPathStr + ")"); - Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr); + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr); long beginLsTs = System.currentTimeMillis(); FileStatus[] statuses = listPartition(partitionPath); long endLsTs = System.currentTimeMillis(); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java similarity index 54% rename from hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java rename to hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 873f7f98f..01b3eb9d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -20,19 +20,47 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; +import javax.annotation.concurrent.ThreadSafe; +import java.io.Serializable; import java.net.URI; /** + * This is an extension of the {@code Path} class allowing to avoid repetitive + * computations (like {@code getFileName}, {@code toString}) which are secured + * by its immutability + * * NOTE: This class is thread-safe */ -public class FileNameCachingPath extends Path { +@ThreadSafe +public class CachingPath extends Path implements Serializable { - // NOTE: volatile keyword is redundant here and put mostly for reader notice, since all + // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 private volatile String fileName; + private volatile String fullPathStr; - public FileNameCachingPath(URI aUri) { + public CachingPath(String parent, String child) { + super(parent, child); + } + + public CachingPath(Path parent, String child) { + super(parent, child); + } + + public CachingPath(String parent, Path child) { + super(parent, child); + } + + public CachingPath(Path parent, Path child) { + super(parent, child); + } + + public CachingPath(String pathString) throws IllegalArgumentException { + super(pathString); + } + + public CachingPath(URI aUri) { super(aUri); } @@ -45,4 +73,14 @@ public class FileNameCachingPath extends Path { } return fileName; } + + @Override + public String toString() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (fullPathStr == null) { + fullPathStr = super.toString(); + } + return fullPathStr; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java new file mode 100644 index 000000000..5ad2307ef --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Objects; + +/** + * {@link Serializable} wrapper encapsulating {@link Path} + */ +public class SerializablePath implements Serializable { + + private Path path; + + public SerializablePath(Path path) { + this.path = path; + } + + public Path get() { + return path; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(path.toString()); + } + + private void readObject(ObjectInputStream in) throws IOException { + String pathStr = in.readUTF(); + path = new CachingPath(pathStr); + } + + @Override + public String toString() { + return path.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializablePath that = (SerializablePath) o; + return Objects.equals(path, that.path); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 58d186f97..c9bdc59da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.util.Lazy; import java.io.IOException; import java.math.BigDecimal; @@ -143,6 +144,31 @@ public class HoodieMetadataPayload implements HoodieRecordPayload METADATA_COLUMN_STATS_BUILDER_STUB = Lazy.lazily(HoodieMetadataColumnStats::newBuilder); + private static final Lazy STRING_WRAPPER_BUILDER_STUB = Lazy.lazily(StringWrapper::newBuilder); + private static final Lazy BYTES_WRAPPER_BUILDER_STUB = Lazy.lazily(BytesWrapper::newBuilder); + private static final Lazy DOUBLE_WRAPPER_BUILDER_STUB = Lazy.lazily(DoubleWrapper::newBuilder); + private static final Lazy FLOAT_WRAPPER_BUILDER_STUB = Lazy.lazily(FloatWrapper::newBuilder); + private static final Lazy LONG_WRAPPER_BUILDER_STUB = Lazy.lazily(LongWrapper::newBuilder); + private static final Lazy INT_WRAPPER_BUILDER_STUB = Lazy.lazily(IntWrapper::newBuilder); + private static final Lazy BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder); + private static final Lazy TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB = Lazy.lazily(TimestampMicrosWrapper::newBuilder); + private static final Lazy DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder); + private static final Lazy DATE_WRAPPER_BUILDER_STUB = Lazy.lazily(DateWrapper::newBuilder); + private String key = null; private int type = 0; private Map filesystemMetadata = null; @@ -201,7 +227,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload