[HUDI-2648] Retry FileSystem action instead of failed directly. (#3887)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -470,7 +470,8 @@ public class SparkMain {
|
|||||||
HoodieTableMetaClient metaClient =
|
HoodieTableMetaClient metaClient =
|
||||||
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
|
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
|
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||||
|
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||||
try {
|
try {
|
||||||
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
|
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
|
||||||
.run(HoodieTableVersion.valueOf(toVersion), null);
|
.run(HoodieTableVersion.valueOf(toVersion), null);
|
||||||
|
|||||||
@@ -134,7 +134,8 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
|
|||||||
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
|
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
|
||||||
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
|
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
|
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||||
|
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Option<EmbeddedTimelineService> getTimelineServer() {
|
public Option<EmbeddedTimelineService> getTimelineServer() {
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
|
|||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.engine.EngineType;
|
import org.apache.hudi.common.engine.EngineType;
|
||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
|
import org.apache.hudi.common.fs.FileSystemRetryConfig;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
@@ -447,6 +448,7 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
.withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc.");
|
.withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc.");
|
||||||
|
|
||||||
private ConsistencyGuardConfig consistencyGuardConfig;
|
private ConsistencyGuardConfig consistencyGuardConfig;
|
||||||
|
private FileSystemRetryConfig fileSystemRetryConfig;
|
||||||
|
|
||||||
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
|
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
|
||||||
// We keep track of original config and rewritten config
|
// We keep track of original config and rewritten config
|
||||||
@@ -840,6 +842,7 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
newProps.putAll(props);
|
newProps.putAll(props);
|
||||||
this.engineType = engineType;
|
this.engineType = engineType;
|
||||||
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
|
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
|
||||||
|
this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
|
||||||
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
|
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
|
||||||
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
|
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
|
||||||
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
|
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
|
||||||
@@ -1725,6 +1728,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return consistencyGuardConfig;
|
return consistencyGuardConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FileSystemRetryConfig getFileSystemRetryConfig() {
|
||||||
|
return fileSystemRetryConfig;
|
||||||
|
}
|
||||||
|
|
||||||
public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
|
public void setConsistencyGuardConfig(ConsistencyGuardConfig consistencyGuardConfig) {
|
||||||
this.consistencyGuardConfig = consistencyGuardConfig;
|
this.consistencyGuardConfig = consistencyGuardConfig;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,7 +53,8 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
|
|||||||
HoodieTableMetaClient metaClient =
|
HoodieTableMetaClient metaClient =
|
||||||
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
|
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||||
|
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||||
return HoodieFlinkTable.create(config, context, metaClient);
|
return HoodieFlinkTable.create(config, context, metaClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -63,7 +63,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
|||||||
HoodieTableMetaClient metaClient =
|
HoodieTableMetaClient metaClient =
|
||||||
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
|
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||||
|
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
|
||||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
|
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,142 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.fs;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.ConfigClassProperty;
|
||||||
|
import org.apache.hudi.common.config.ConfigGroups;
|
||||||
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
|
import org.apache.hudi.common.config.HoodieConfig;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The file system retry relevant config options.
|
||||||
|
*/
|
||||||
|
@ConfigClassProperty(name = "FileSystem Guard Configurations",
|
||||||
|
groupName = ConfigGroups.Names.WRITE_CLIENT,
|
||||||
|
description = "The filesystem retry related config options, to help deal with runtime exception like list/get/put/delete performance issues.")
|
||||||
|
public class FileSystemRetryConfig extends HoodieConfig {
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> FILESYSTEM_RETRY_ENABLE = ConfigProperty
|
||||||
|
.key("hoodie.filesystem.operation.retry.enable")
|
||||||
|
.defaultValue("false")
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Enabled to handle list/get/delete etc file system performance issue.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Long> INITIAL_RETRY_INTERVAL_MS = ConfigProperty
|
||||||
|
.key("hoodie.filesystem.operation.retry.initial_interval_ms")
|
||||||
|
.defaultValue(100L)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Long> MAX_RETRY_INTERVAL_MS = ConfigProperty
|
||||||
|
.key("hoodie.filesystem.operation.retry.max_interval_ms")
|
||||||
|
.defaultValue(2000L)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Integer> MAX_RETRY_NUMBERS = ConfigProperty
|
||||||
|
.key("hoodie.filesystem.operation.retry.max_numbers")
|
||||||
|
.defaultValue(4)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Maximum number of retry actions to perform, with exponential backoff.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
|
||||||
|
.key("hoodie.filesystem.operation.retry.exceptions")
|
||||||
|
.defaultValue("")
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
|
||||||
|
+ "Default is empty which means retry all the IOException and RuntimeException from FileSystem");
|
||||||
|
|
||||||
|
private FileSystemRetryConfig() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getInitialRetryIntervalMs() {
|
||||||
|
return getLong(INITIAL_RETRY_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxRetryIntervalMs() {
|
||||||
|
return getLong(MAX_RETRY_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxRetryNumbers() {
|
||||||
|
return getInt(MAX_RETRY_NUMBERS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFileSystemActionRetryEnable() {
|
||||||
|
return Boolean.parseBoolean(getStringOrDefault(FILESYSTEM_RETRY_ENABLE));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FileSystemRetryConfig.Builder newBuilder() {
|
||||||
|
return new Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRetryExceptions() {
|
||||||
|
return getString(RETRY_EXCEPTIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The builder used to build filesystem configurations.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
|
||||||
|
private final FileSystemRetryConfig fileSystemRetryConfig = new FileSystemRetryConfig();
|
||||||
|
|
||||||
|
public Builder fromFile(File propertiesFile) throws IOException {
|
||||||
|
try (FileReader reader = new FileReader(propertiesFile)) {
|
||||||
|
fileSystemRetryConfig.getProps().load(reader);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder fromProperties(Properties props) {
|
||||||
|
this.fileSystemRetryConfig.getProps().putAll(props);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withMaxRetryNumbers(int numbers) {
|
||||||
|
fileSystemRetryConfig.setValue(MAX_RETRY_NUMBERS, String.valueOf(numbers));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withInitialRetryIntervalMs(long intervalMs) {
|
||||||
|
fileSystemRetryConfig.setValue(INITIAL_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withMaxRetryIntervalMs(long intervalMs) {
|
||||||
|
fileSystemRetryConfig.setValue(MAX_RETRY_INTERVAL_MS, String.valueOf(intervalMs));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withFileSystemActionRetryEnabled(boolean enabled) {
|
||||||
|
fileSystemRetryConfig.setValue(FILESYSTEM_RETRY_ENABLE, String.valueOf(enabled));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileSystemRetryConfig build() {
|
||||||
|
fileSystemRetryConfig.setDefaults(FileSystemRetryConfig.class.getName());
|
||||||
|
return fileSystemRetryConfig;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,257 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Options;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.apache.hudi.common.util.RetryHelper;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
public class HoodieRetryWrapperFileSystem extends FileSystem {
|
||||||
|
|
||||||
|
private FileSystem fileSystem;
|
||||||
|
private long maxRetryIntervalMs;
|
||||||
|
private int maxRetryNumbers;
|
||||||
|
private long initialRetryIntervalMs;
|
||||||
|
private String retryExceptionsList;
|
||||||
|
|
||||||
|
public HoodieRetryWrapperFileSystem(FileSystem fs, long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
|
||||||
|
this.fileSystem = fs;
|
||||||
|
this.maxRetryIntervalMs = maxRetryIntervalMs;
|
||||||
|
this.maxRetryNumbers = maxRetryNumbers;
|
||||||
|
this.initialRetryIntervalMs = initialRetryIntervalMs;
|
||||||
|
this.retryExceptionsList = retryExceptions;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public URI getUri() {
|
||||||
|
return fileSystem.getUri();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||||
|
return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f, bufferSize)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream open(Path f) throws IOException {
|
||||||
|
return (FSDataInputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.open(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f,
|
||||||
|
FsPermission permission,
|
||||||
|
boolean overwrite,
|
||||||
|
int bufferSize,
|
||||||
|
short replication,
|
||||||
|
long blockSize,
|
||||||
|
Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, overwrite)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, short replication) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.create(f, replication, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, overwrite, bufferSize)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
|
||||||
|
throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, overwrite, bufferSize, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
|
||||||
|
short replication, long blockSize, Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication, blockSize, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags, int bufferSize,
|
||||||
|
short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication,
|
||||||
|
blockSize, progress, checksumOpt)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
|
||||||
|
throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.create(f, overwrite, bufferSize, replication, blockSize)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean createNewFile(Path f) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.createNewFile(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize, progress)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path f) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
|
||||||
|
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.append(f, bufferSize)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rename(Path src, Path dst) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.rename(src, dst)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, recursive)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean delete(Path f) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.delete(f, true)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(f, filter)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path[] files) throws IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listStatus(files, filter)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] globStatus(Path pathPattern) throws IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
|
||||||
|
return (FileStatus[]) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.globStatus(pathPattern, filter)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
|
||||||
|
return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.listLocatedStatus(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
|
||||||
|
return (RemoteIterator<LocatedFileStatus>) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
|
||||||
|
.tryWith(() -> fileSystem.listFiles(f, recursive)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWorkingDirectory(Path newDir) {
|
||||||
|
fileSystem.setWorkingDirectory(newDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getWorkingDirectory() {
|
||||||
|
return fileSystem.getWorkingDirectory();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.mkdirs(f, permission)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
return (FileStatus) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.getFileStatus(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean exists(Path f) throws IOException {
|
||||||
|
return (boolean) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList).tryWith(() -> fileSystem.exists(f)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return fileSystem.getConf();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -23,6 +23,8 @@ import org.apache.hudi.common.config.SerializableConfiguration;
|
|||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
||||||
|
import org.apache.hudi.common.fs.FileSystemRetryConfig;
|
||||||
|
import org.apache.hudi.common.fs.HoodieRetryWrapperFileSystem;
|
||||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||||
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
|
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
@@ -100,12 +102,14 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
private HoodieActiveTimeline activeTimeline;
|
private HoodieActiveTimeline activeTimeline;
|
||||||
private HoodieArchivedTimeline archivedTimeline;
|
private HoodieArchivedTimeline archivedTimeline;
|
||||||
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
||||||
|
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
|
||||||
|
|
||||||
private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
|
||||||
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
|
ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion,
|
||||||
String payloadClassName) {
|
String payloadClassName, FileSystemRetryConfig fileSystemRetryConfig) {
|
||||||
LOG.info("Loading HoodieTableMetaClient from " + basePath);
|
LOG.info("Loading HoodieTableMetaClient from " + basePath);
|
||||||
this.consistencyGuardConfig = consistencyGuardConfig;
|
this.consistencyGuardConfig = consistencyGuardConfig;
|
||||||
|
this.fileSystemRetryConfig = fileSystemRetryConfig;
|
||||||
this.hadoopConf = new SerializableConfiguration(conf);
|
this.hadoopConf = new SerializableConfiguration(conf);
|
||||||
Path basePathDir = new Path(basePath);
|
Path basePathDir = new Path(basePath);
|
||||||
this.basePath = basePathDir.toString();
|
this.basePath = basePathDir.toString();
|
||||||
@@ -141,7 +145,8 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
|
|
||||||
public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
|
public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
|
||||||
return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
|
return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
|
||||||
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
|
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null)
|
||||||
|
.setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -256,6 +261,14 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
public HoodieWrapperFileSystem getFs() {
|
public HoodieWrapperFileSystem getFs() {
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
|
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
|
||||||
|
|
||||||
|
if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) {
|
||||||
|
fileSystem = new HoodieRetryWrapperFileSystem(fileSystem,
|
||||||
|
fileSystemRetryConfig.getMaxRetryIntervalMs(),
|
||||||
|
fileSystemRetryConfig.getMaxRetryNumbers(),
|
||||||
|
fileSystemRetryConfig.getInitialRetryIntervalMs(),
|
||||||
|
fileSystemRetryConfig.getRetryExceptions());
|
||||||
|
}
|
||||||
ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
||||||
"File System not expected to be that of HoodieWrapperFileSystem");
|
"File System not expected to be that of HoodieWrapperFileSystem");
|
||||||
fs = new HoodieWrapperFileSystem(fileSystem,
|
fs = new HoodieWrapperFileSystem(fileSystem,
|
||||||
@@ -266,6 +279,10 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setFs(HoodieWrapperFileSystem fs) {
|
||||||
|
this.fs = fs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return raw file-system.
|
* Return raw file-system.
|
||||||
*
|
*
|
||||||
@@ -305,6 +322,10 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return consistencyGuardConfig;
|
return consistencyGuardConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FileSystemRetryConfig getFileSystemRetryConfig() {
|
||||||
|
return fileSystemRetryConfig;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
|
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
|
||||||
* This should not be used, unless for historical debugging purposes.
|
* This should not be used, unless for historical debugging purposes.
|
||||||
@@ -578,6 +599,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
private boolean loadActiveTimelineOnLoad = false;
|
private boolean loadActiveTimelineOnLoad = false;
|
||||||
private String payloadClassName = null;
|
private String payloadClassName = null;
|
||||||
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
||||||
|
private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build();
|
||||||
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
|
private Option<TimelineLayoutVersion> layoutVersion = Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
|
||||||
|
|
||||||
public Builder setConf(Configuration conf) {
|
public Builder setConf(Configuration conf) {
|
||||||
@@ -605,6 +627,11 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setFileSystemRetryConfig(FileSystemRetryConfig fileSystemRetryConfig) {
|
||||||
|
this.fileSystemRetryConfig = fileSystemRetryConfig;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
|
public Builder setLayoutVersion(Option<TimelineLayoutVersion> layoutVersion) {
|
||||||
this.layoutVersion = layoutVersion;
|
this.layoutVersion = layoutVersion;
|
||||||
return this;
|
return this;
|
||||||
@@ -614,7 +641,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
|
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init HoodieTableMetaClient");
|
||||||
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
|
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init HoodieTableMetaClient");
|
||||||
return new HoodieTableMetaClient(conf, basePath,
|
return new HoodieTableMetaClient(conf, basePath,
|
||||||
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName);
|
loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, fileSystemRetryConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class RetryHelper<T> {
|
||||||
|
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
|
||||||
|
private CheckedFunction<T> func;
|
||||||
|
private int num;
|
||||||
|
private long maxIntervalTime;
|
||||||
|
private long initialIntervalTime = 100L;
|
||||||
|
private String taskInfo = "N/A";
|
||||||
|
private List<? extends Class<? extends Exception>> retryExceptionsClasses;
|
||||||
|
|
||||||
|
public RetryHelper() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
|
||||||
|
this.num = maxRetryNumbers;
|
||||||
|
this.initialIntervalTime = initialRetryIntervalMs;
|
||||||
|
this.maxIntervalTime = maxRetryIntervalMs;
|
||||||
|
if (StringUtils.isNullOrEmpty(retryExceptions)) {
|
||||||
|
this.retryExceptionsClasses = new ArrayList<>();
|
||||||
|
} else {
|
||||||
|
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
|
||||||
|
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
|
||||||
|
.map(Exception::getClass)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public RetryHelper(String taskInfo) {
|
||||||
|
this.taskInfo = taskInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RetryHelper tryWith(CheckedFunction<T> func) {
|
||||||
|
this.func = func;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public T start() throws IOException {
|
||||||
|
int retries = 0;
|
||||||
|
T functionResult = null;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
long waitTime = Math.min(getWaitTimeExp(retries), maxIntervalTime);
|
||||||
|
try {
|
||||||
|
functionResult = func.get();
|
||||||
|
break;
|
||||||
|
} catch (IOException | RuntimeException e) {
|
||||||
|
if (!checkIfExceptionInRetryList(e)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
if (retries++ >= num) {
|
||||||
|
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
|
||||||
|
try {
|
||||||
|
Thread.sleep(waitTime);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// ignore InterruptedException here
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (retries > 0) {
|
||||||
|
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
|
||||||
|
}
|
||||||
|
return functionResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkIfExceptionInRetryList(Exception e) {
|
||||||
|
boolean inRetryList = false;
|
||||||
|
|
||||||
|
// if users didn't set hoodie.filesystem.operation.retry.exceptions
|
||||||
|
// we will retry all the IOException and RuntimeException
|
||||||
|
if (retryExceptionsClasses.isEmpty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Class<? extends Exception> clazz : retryExceptionsClasses) {
|
||||||
|
if (clazz.isInstance(e)) {
|
||||||
|
inRetryList = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return inRetryList;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getWaitTimeExp(int retryCount) {
|
||||||
|
Random random = new Random();
|
||||||
|
if (0 == retryCount) {
|
||||||
|
return initialIntervalTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (long) Math.pow(2, retryCount) * initialIntervalTime + random.nextInt(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface CheckedFunction<T> {
|
||||||
|
T get() throws IOException;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -68,8 +68,8 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
|||||||
private final long minRollbackToKeep = 10;
|
private final long minRollbackToKeep = 10;
|
||||||
private final long minCleanToKeep = 10;
|
private final long minCleanToKeep = 10;
|
||||||
|
|
||||||
private static final String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
|
public static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||||
|
|||||||
@@ -0,0 +1,210 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version loop.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-loop.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests file system utils with retry wrapper enable.
|
||||||
|
* P.S extends TestFSUtils and setUp a HoodieWrapperFileSystem for metaClient which can test all the TestFSUtils uts with RetryWrapperEnable
|
||||||
|
*/
|
||||||
|
public class TestFSUtilsWithRetryWrapperEnable extends TestFSUtils {
|
||||||
|
|
||||||
|
private static final String EXCEPTION_MESSAGE = "Fake runtime exception here.";
|
||||||
|
private long maxRetryIntervalMs;
|
||||||
|
private int maxRetryNumbers;
|
||||||
|
private long initialRetryIntervalMs;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
initMetaClient();
|
||||||
|
basePath = "file:" + basePath;
|
||||||
|
FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().withFileSystemActionRetryEnabled(true).build();
|
||||||
|
maxRetryIntervalMs = fileSystemRetryConfig.getMaxRetryIntervalMs();
|
||||||
|
maxRetryNumbers = fileSystemRetryConfig.getMaxRetryNumbers();
|
||||||
|
initialRetryIntervalMs = fileSystemRetryConfig.getInitialRetryIntervalMs();
|
||||||
|
|
||||||
|
FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 2);
|
||||||
|
FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
|
||||||
|
|
||||||
|
HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
|
||||||
|
metaClient.setFs(fs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the scenario that fs keeps retrying until it fails.
|
||||||
|
@Test
|
||||||
|
public void testProcessFilesWithExceptions() throws Exception {
|
||||||
|
FakeRemoteFileSystem fakeFs = new FakeRemoteFileSystem(FSUtils.getFs(metaClient.getMetaPath(), metaClient.getHadoopConf()), 100);
|
||||||
|
FileSystem fileSystem = new HoodieRetryWrapperFileSystem(fakeFs, maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, "");
|
||||||
|
HoodieWrapperFileSystem fs = new HoodieWrapperFileSystem(fileSystem, new NoOpConsistencyGuard());
|
||||||
|
metaClient.setFs(fs);
|
||||||
|
List<String> folders =
|
||||||
|
Arrays.asList("2016/04/15", ".hoodie/.temp/2/2016/04/15");
|
||||||
|
folders.forEach(f -> assertThrows(RuntimeException.class, () -> metaClient.getFs().mkdirs(new Path(new Path(basePath), f))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fake remote FileSystem which will throw RuntimeException something like AmazonS3Exception 503.
|
||||||
|
*/
|
||||||
|
class FakeRemoteFileSystem extends FileSystem {
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
private int count = 1;
|
||||||
|
private int loop;
|
||||||
|
|
||||||
|
public FakeRemoteFileSystem(FileSystem fs, int retryLoop) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.loop = retryLoop;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public URI getUri() {
|
||||||
|
return fs.getUri();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.open(f, bufferSize);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.append(f, bufferSize, progress);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rename(Path src, Path dst) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.rename(src, dst);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.delete(f, recursive);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.listStatus(f);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWorkingDirectory(Path newDir) {
|
||||||
|
fs.setWorkingDirectory(newDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getWorkingDirectory() {
|
||||||
|
return fs.getWorkingDirectory();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.mkdirs(f, permission);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
if (count % loop == 0) {
|
||||||
|
count++;
|
||||||
|
return fs.getFileStatus(f);
|
||||||
|
} else {
|
||||||
|
count++;
|
||||||
|
throw new RuntimeException(EXCEPTION_MESSAGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
|
||||||
|
return fs.listLocatedStatus(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return fs.getConf();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -126,6 +126,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
|||||||
HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
|
HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
|
||||||
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
|
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
|
||||||
|
.setFileSystemRetryConfig(metaClient.getFileSystemRetryConfig())
|
||||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build());
|
.setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build());
|
||||||
// Old Timeline writes both to aux and timeline folder
|
// Old Timeline writes both to aux and timeline folder
|
||||||
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
|
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
|
||||||
|
|||||||
Reference in New Issue
Block a user