1
0

Explicitly handle lack of append() support during LogWriting

This commit is contained in:
Vinoth Chandar
2018-11-27 16:54:46 -08:00
committed by vinoth chandar
parent d0fde47458
commit fa65db9c4c
4 changed files with 130 additions and 31 deletions

View File

@@ -16,16 +16,15 @@
package com.uber.hoodie.io.storage;
import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
@@ -60,20 +59,6 @@ import org.apache.hadoop.util.Progressable;
public class HoodieWrapperFileSystem extends FileSystem {
public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
private static final Set<String> SUPPORT_SCHEMES;
static {
SUPPORT_SCHEMES = new HashSet<>();
SUPPORT_SCHEMES.add("file");
SUPPORT_SCHEMES.add("hdfs");
SUPPORT_SCHEMES.add("s3");
SUPPORT_SCHEMES.add("s3a");
// Hoodie currently relies on underlying object store being fully
// consistent so only regional buckets should be used.
SUPPORT_SCHEMES.add("gs");
SUPPORT_SCHEMES.add("viewfs");
}
private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new
ConcurrentHashMap<>();
@@ -104,7 +89,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
public static String getHoodieScheme(String scheme) {
String newScheme;
if (SUPPORT_SCHEMES.contains(scheme)) {
if (StorageSchemes.isSchemeSupported(scheme)) {
newScheme = HOODIE_SCHEME_PREFIX + scheme;
} else {
throw new IllegalArgumentException(

View File

@@ -0,0 +1,66 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.storage;
import java.util.Arrays;
/**
* All the supported storage schemes in Hoodie.
*/
public enum StorageSchemes {
// Local filesystem
FILE("file", false),
// Hadoop File System
HDFS("hdfs", true),
// Apache Ignite FS
IGNITE("igfs", true),
// AWS S3
S3A("s3a", false),
S3("s3", false),
// Google Cloud Storage
GCS("gs", false),
// View FS for federated setups. If federating across cloud stores, then append support is false
VIEWFS("viewfs", true);
private String scheme;
private boolean supportsAppend;
StorageSchemes(String scheme, boolean supportsAppend) {
this.scheme = scheme;
this.supportsAppend = supportsAppend;
}
public String getScheme() {
return scheme;
}
public boolean supportsAppend() {
return supportsAppend;
}
public static boolean isSchemeSupported(String scheme) {
return Arrays.stream(values()).filter(s -> s.getScheme().equals(scheme)).count() > 0;
}
public static boolean isAppendSupported(String scheme) {
if (!isSchemeSupported(scheme)) {
throw new IllegalArgumentException("Unsupported scheme :" + scheme);
}
return Arrays.stream(StorageSchemes.values())
.filter(s -> s.supportsAppend() && s.scheme.equals(scheme)).count() > 0;
}
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
@@ -68,26 +69,32 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
Path path = logFile.getPath();
if (fs.exists(path)) {
log.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
log.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().equalsIgnoreCase("Not supported")) {
log.info("Append not supported. Opening a new log file..");
this.logFile = logFile.rollOver(fs);
createNewFile();
} else {
throw ioe;
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
log.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
log.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs);
log.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
log.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
// TODO - append a file level meta block
}
}

View File

@@ -0,0 +1,41 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.storage;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
public class TestStorageSchemes {
@Test
public void testStorageSchemes() {
assertTrue(StorageSchemes.isSchemeSupported("hdfs"));
assertFalse(StorageSchemes.isSchemeSupported("s2"));
assertFalse(StorageSchemes.isAppendSupported("s3a"));
assertFalse(StorageSchemes.isAppendSupported("gs"));
assertTrue(StorageSchemes.isAppendSupported("viewfs"));
try {
StorageSchemes.isAppendSupported("s2");
fail("Should throw exception for unsupported schemes");
} catch (IllegalArgumentException ignore) {
// expected.
}
}
}