diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 1b6d10b17..bb7c2d89a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -470,7 +470,8 @@ public class SparkMain { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); try { new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.valueOf(toVersion), null); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 8ea61d9a6..3f208a0f8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -134,7 +134,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); } public Option getTimelineServer() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e3efe418f..3797efab6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FileSystemRetryConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -447,6 +448,7 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc."); private ConsistencyGuardConfig consistencyGuardConfig; + private FileSystemRetryConfig fileSystemRetryConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled // We keep track of original config and rewritten config @@ -840,6 +842,7 @@ public class HoodieWriteConfig extends HoodieConfig { newProps.putAll(props); this.engineType = engineType; this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build(); + this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build(); this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build(); this.viewStorageConfig = clientSpecifiedViewStorageConfig; this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); @@ -1725,6 +1728,10 @@ public class HoodieWriteConfig extends HoodieConfig { return consistencyGuardConfig; } + public FileSystemRetryConfig getFileSystemRetryConfig() { + return fileSystemRetryConfig; + } + public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) { this.consistencyGuardConfig = consistencyGuardConfig; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 164b00e2d..2f08a55c9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -53,7 +53,8 @@ public abstract class HoodieFlinkTable HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); return HoodieFlinkTable.create(config, context, metaClient); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 35c9ab3a0..118438c60 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -63,7 +63,8 @@ public abstract class HoodieSparkTable HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java new file mode 100644 index 000000000..c7f99ece7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FileSystemRetryConfig.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * The file system retry relevant config options. + */ +@ConfigClassProperty(name = "FileSystem Guard Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "The filesystem retry related config options, to help deal with runtime exception like list/get/put/delete performance issues.") +public class FileSystemRetryConfig extends HoodieConfig { + + public static final ConfigProperty FILESYSTEM_RETRY_ENABLE = ConfigProperty + .key("hoodie.filesystem.operation.retry.enable") + .defaultValue("false") + .sinceVersion("0.11.0") + .withDocumentation("Enabled to handle list/get/delete etc file system performance issue."); + + public static final ConfigProperty INITIAL_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.operation.retry.initial_interval_ms") + .defaultValue(100L) + .sinceVersion("0.11.0") + .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage."); + + public static final ConfigProperty MAX_RETRY_INTERVAL_MS = ConfigProperty + .key("hoodie.filesystem.operation.retry.max_interval_ms") + .defaultValue(2000L) + .sinceVersion("0.11.0") + .withDocumentation("Maximum amount of time (in ms), to wait for next retry."); + + public static final ConfigProperty MAX_RETRY_NUMBERS = ConfigProperty + .key("hoodie.filesystem.operation.retry.max_numbers") + .defaultValue(4) + .sinceVersion("0.11.0") + .withDocumentation("Maximum number of retry actions to perform, with exponential backoff."); + + public static final ConfigProperty RETRY_EXCEPTIONS = ConfigProperty + .key("hoodie.filesystem.operation.retry.exceptions") + .defaultValue("") + .sinceVersion("0.11.0") + .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. " + + "Default is empty which means retry all the IOException and RuntimeException from FileSystem"); + + private FileSystemRetryConfig() { + super(); + } + + public long getInitialRetryIntervalMs() { + return getLong(INITIAL_RETRY_INTERVAL_MS); + } + + public long getMaxRetryIntervalMs() { + return getLong(MAX_RETRY_INTERVAL_MS); + } + + public int getMaxRetryNumbers() { + return getInt(MAX_RETRY_NUMBERS); + } + + public boolean isFileSystemActionRetryEnable() { + return Boolean.parseBoolean(getStringOrDefault(FILESYSTEM_RETRY_ENABLE)); + } + + public static FileSystemRetryConfig.Builder newBuilder() { + return new Builder(); + } + + public String getRetryExceptions() { + return getString(RETRY_EXCEPTIONS); + } + + /** + * The builder used to build filesystem configurations. + */ + public static class Builder { + + private final FileSystemRetryConfig fileSystemRetryConfig = new FileSystemRetryConfig(); + + public Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + fileSystemRetryConfig.getProps().load(reader); + return this; + } + } + + public Builder fromProperties(Properties props) { + this.fileSystemRetryConfig.getProps().putAll(props); + return this; + } + + public Builder withMaxRetryNumbers(int numbers) { + fileSystemRetryConfig.setValue(MAX_RETRY_NUMBERS, String.valueOf(numbers)); + return this; + } + + public Builder withInitialRetryIntervalMs(long intervalMs) { + fileSystemRetryConfig.setValue(INITIAL_RETRY_INTERVAL_MS, String.valueOf(intervalMs)); + return this; + } + + public Builder withMaxRetryIntervalMs(long intervalMs) { + fileSystemRetryConfig.setValue(MAX_RETRY_INTERVAL_MS, String.valueOf(intervalMs)); + return this; + } + + public Builder withFileSystemActionRetryEnabled(boolean enabled) { + fileSystemRetryConfig.setValue(FILESYSTEM_RETRY_ENABLE, String.valueOf(enabled)); + return this; + } + + public FileSystemRetryConfig build() { + fileSystemRetryConfig.setDefaults(FileSystemRetryConfig.class.getName()); + return fileSystemRetryConfig; + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java new file mode 100644 index 000000000..075f811a4 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieRetryWrapperFileSystem.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.hudi.common.util.RetryHelper; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.EnumSet; + +public class HoodieRetryWrapperFileSystem extends FileSystem { + + private FileSystem fileSystem; + private long maxRetryIntervalMs; + private int maxRetryNumbers; + private long initialRetryIntervalMs; + private String retryExceptionsList; + + public HoodieRetryWrapperFileSystem(FileSystem fs, long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) { + this.fileSystem = fs; + this.maxRetryIntervalMs = maxRetryIntervalMs; + this.maxRetryNumbers = maxRetryNumbers; + this.initialRetryIntervalMs = initialRetryIntervalMs; + this.retryExceptionsList = retryExceptions; + + } + + @Override + public URI getUri() { + return fileSystem.getUri(); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f, bufferSize)).start(); + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f)).start(); + } + + @Override + public FSDataOutputStream create(Path f, + FsPermission permission, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, overwrite)).start(); + } + + @Override + public FSDataOutputStream create(Path f) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f)).start(); + } + + @Override + public FSDataOutputStream create(Path f, Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, short replication) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication)).start(); + } + + @Override + public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, overwrite, bufferSize)).start(); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) + throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, EnumSet flags, int bufferSize, + short replication, long blockSize, Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication, blockSize, progress)).start(); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, EnumSet flags, int bufferSize, + short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication, + blockSize, progress, checksumOpt)).start(); + } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) + throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize)).start(); + } + + @Override + public boolean createNewFile(Path f) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.createNewFile(f)).start(); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize, progress)).start(); + } + + @Override + public FSDataOutputStream append(Path f) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f)).start(); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize) throws IOException { + return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize)).start(); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.rename(src, dst)).start(); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, recursive)).start(); + } + + @Override + public boolean delete(Path f) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, true)).start(); + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f)).start(); + } + + @Override + public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f, filter)).start(); + } + + @Override + public FileStatus[] listStatus(Path[] files) throws IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files)).start(); + } + + @Override + public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files, filter)).start(); + } + + @Override + public FileStatus[] globStatus(Path pathPattern) throws IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern)).start(); + } + + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException { + return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern, filter)).start(); + } + + @Override + public RemoteIterator listLocatedStatus(Path f) throws IOException { + return (RemoteIterator) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listLocatedStatus(f)).start(); + } + + @Override + public RemoteIterator listFiles(Path f, boolean recursive) throws IOException { + return (RemoteIterator) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList) + .tryWith(() -> fileSystem.listFiles(f, recursive)).start(); + } + + @Override + public void setWorkingDirectory(Path newDir) { + fileSystem.setWorkingDirectory(newDir); + } + + @Override + public Path getWorkingDirectory() { + return fileSystem.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.mkdirs(f, permission)).start(); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return (FileStatus) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.getFileStatus(f)).start(); + } + + @Override + public boolean exists(Path f) throws IOException { + return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.exists(f)).start(); + } + + @Override + public Configuration getConf() { + return fileSystem.getConf(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 5ad3b329a..740d569a5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; +import org.apache.hudi.common.fs.FileSystemRetryConfig; +import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -100,12 +102,14 @@ public class HoodieTableMetaClient implements Serializable { private HoodieActiveTimeline activeTimeline; private HoodieArchivedTimeline archivedTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); + private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, - String payloadClassName) { + String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) { LOG.info("Loading HoodieTableMetaClient from " + basePath); this.consistencyGuardConfig = consistencyGuardConfig; + this.fileSystemRetryConfig = fileSystemRetryConfig; this.hadoopConf = new SerializableConfiguration(conf); Path basePathDir = new Path(basePath); this.basePath = basePathDir.toString(); @@ -141,7 +145,8 @@ public class HoodieTableMetaClient implements Serializable { public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) - .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build(); + .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null) + .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build(); } /** @@ -256,6 +261,14 @@ public class HoodieTableMetaClient implements Serializable { public HoodieWrapperFileSystem getFs() { if (fs == null) { FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy()); + + if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) { + fileSystem = new HoodieRetryWrapperFileSystem(fileSystem, + fileSystemRetryConfig.getMaxRetryIntervalMs(), + fileSystemRetryConfig.getMaxRetryNumbers(), + fileSystemRetryConfig.getInitialRetryIntervalMs(), + fileSystemRetryConfig.getRetryExceptions()); + } ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem), "File System not expected to be that of HoodieWrapperFileSystem"); fs = new HoodieWrapperFileSystem(fileSystem, @@ -266,6 +279,10 @@ public class HoodieTableMetaClient implements Serializable { return fs; } + public void setFs(HoodieWrapperFileSystem fs) { + this.fs = fs; + } + /** * Return raw file-system. * @@ -305,6 +322,10 @@ public class HoodieTableMetaClient implements Serializable { return consistencyGuardConfig; } + public FileSystemRetryConfig getFileSystemRetryConfig() { + return fileSystemRetryConfig; + } + /** * Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read. * This should not be used, unless for historical debugging purposes. @@ -578,6 +599,7 @@ public class HoodieTableMetaClient implements Serializable { private boolean loadActiveTimelineOnLoad = false; private String payloadClassName = null; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); + private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); private Option layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION); public Builder setConf(Configuration conf) { @@ -605,6 +627,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public Builder setFileSystemRetryConfig(FileSystemRetryConfig fileSystemRetryConfig) { + this.fileSystemRetryConfig = fileSystemRetryConfig; + return this; + } + public Builder setLayoutVersion(Option layoutVersion) { this.layoutVersion = layoutVersion; return this; @@ -614,7 +641,7 @@ public class HoodieTableMetaClient implements Serializable { ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient"); ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient"); return new HoodieTableMetaClient(conf, basePath, - loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName); + loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java new file mode 100644 index 000000000..067c5ee40 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +public class RetryHelper { + private static final Logger LOG = LogManager.getLogger(RetryHelper.class); + private CheckedFunction func; + private int num; + private long maxIntervalTime; + private long initialIntervalTime = 100L; + private String taskInfo = "N/A"; + private List> retryExceptionsClasses; + + public RetryHelper() { + } + + public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) { + this.num = maxRetryNumbers; + this.initialIntervalTime = initialRetryIntervalMs; + this.maxIntervalTime = maxRetryIntervalMs; + if (StringUtils.isNullOrEmpty(retryExceptions)) { + this.retryExceptionsClasses = new ArrayList<>(); + } else { + this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) + .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) + .map(Exception::getClass) + .collect(Collectors.toList()); + } + } + + public RetryHelper(String taskInfo) { + this.taskInfo = taskInfo; + } + + public RetryHelper tryWith(CheckedFunction func) { + this.func = func; + return this; + } + + public T start() throws IOException { + int retries = 0; + T functionResult = null; + + while (true) { + long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime); + try { + functionResult = func.get(); + break; + } catch (IOException | RuntimeException e) { + if (!checkIfExceptionInRetryList(e)) { + throw e; + } + if (retries++ >= num) { + LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e); + throw e; + } + LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e); + try { + Thread.sleep(waitTime); + } catch (InterruptedException ex) { + // ignore InterruptedException here + } + } + } + + if (retries > 0) { + LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); + } + return functionResult; + } + + private boolean checkIfExceptionInRetryList(Exception e) { + boolean inRetryList = false; + + // if users didn't set hoodie.filesystem.operation.retry.exceptions + // we will retry all the IOException and RuntimeException + if (retryExceptionsClasses.isEmpty()) { + return true; + } + + for (Class clazz : retryExceptionsClasses) { + if (clazz.isInstance(e)) { + inRetryList = true; + break; + } + } + return inRetryList; + } + + private long getWaitTimeExp(int retryCount) { + Random random = new Random(); + if (0 == retryCount) { + return initialIntervalTime; + } + + return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100); + } + + @FunctionalInterface + public interface CheckedFunction { + T get() throws IOException; + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index e4460ce62..f51702a44 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -68,8 +68,8 @@ public class TestFSUtils extends HoodieCommonTestHarness { private final long minRollbackToKeep = 10; private final long minCleanToKeep = 10; - private static final String TEST_WRITE_TOKEN = "1-0-1"; - private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); + private static String TEST_WRITE_TOKEN = "1-0-1"; + public static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @Rule public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java new file mode 100644 index 000000000..0b849ebec --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtilsWithRetryWrapperEnable.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version loop.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-loop.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests file system utils with retry wrapper enable. + * P.S extends TestFSUtils and setUp a HoodieWrapperFileSystem for metaClient which can test all the TestFSUtils uts with RetryWrapperEnable + */ +public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils { + + private static final String EXCEPTION_MESSAGE = "Fake runtime exception here."; + private long maxRetryIntervalMs; + private int maxRetryNumbers; + private long initialRetryIntervalMs; + + @Override + @BeforeEach + public void setUp() throws IOException { + initMetaClient(); + basePath = "file:" + basePath; + FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build(); + maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs(); + maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers(); + initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs(); + + FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 2); + FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); + + HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard()); + metaClient.setFs(fs); + } + + // Test the scenario that fs keeps retrying until it fails. + @Test + public void testProcessFilesWithExceptions() throws Exception { + FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 100); + FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, ""); + HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard()); + metaClient.setFs(fs); + List folders = + Arrays.asList("2016/04/15", ".hoodie/.temp/2/2016/04/15"); + folders.forEach(f -> assertThrows(RuntimeException.class, () -> metaClient.getFs().mkdirs(new Path(new Path(basePath), f)))); + } + + /** + * Fake remote FileSystem which will throw RuntimeException something like AmazonS3Exception 503. + */ + class FakeRemoteFileSystem extends FileSystem { + + private FileSystem fs; + private int count = 1; + private int loop; + + public FakeRemoteFileSystem(FileSystem fs, int retryLoop) { + this.fs = fs; + this.loop = retryLoop; + } + + @Override + public URI getUri() { + return fs.getUri(); + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + if (count % loop == 0) { + count++; + return fs.open(f, bufferSize); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + if (count % loop == 0) { + count++; + return fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + if (count % loop == 0) { + count++; + return fs.append(f, bufferSize, progress); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + if (count % loop == 0) { + count++; + return fs.rename(src, dst); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + if (count % loop == 0) { + count++; + return fs.delete(f, recursive); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + if (count % loop == 0) { + count++; + return fs.listStatus(f); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public void setWorkingDirectory(Path newDir) { + fs.setWorkingDirectory(newDir); + } + + @Override + public Path getWorkingDirectory() { + return fs.getWorkingDirectory(); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if (count % loop == 0) { + count++; + return fs.mkdirs(f, permission); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + if (count % loop == 0) { + count++; + return fs.getFileStatus(f); + } else { + count++; + throw new RuntimeException(EXCEPTION_MESSAGE); + } + } + + @Override + public RemoteIterator listLocatedStatus(Path f) throws IOException { + return fs.listLocatedStatus(f); + } + + @Override + public Configuration getConf() { + return fs.getConf(); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 9d89c2a6b..576cfd7cb 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -126,6 +126,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline( HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig()) + .setFileSystemRetryConfig(metaClient.getFileSystemRetryConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build()); // Old Timeline writes both to aux and timeline folder oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));