From fa65db9c4cd8ce1c71d46789c9bb8e0fb03de98a Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 27 Nov 2018 16:54:46 -0800 Subject: [PATCH] Explicitly handle lack of append() support during LogWriting --- .../io/storage/HoodieWrapperFileSystem.java | 19 +----- .../hoodie/common/storage/StorageSchemes.java | 66 +++++++++++++++++++ .../table/log/HoodieLogFormatWriter.java | 35 ++++++---- .../common/storage/TestStorageSchemes.java | 41 ++++++++++++ 4 files changed, 130 insertions(+), 31 deletions(-) create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java create mode 100644 hoodie-common/src/test/java/com/uber/hoodie/common/storage/TestStorageSchemes.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java index 390e73abe..57c32311b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java @@ -16,16 +16,15 @@ package com.uber.hoodie.io.storage; +import com.uber.hoodie.common.storage.StorageSchemes; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; @@ -60,20 +59,6 @@ import org.apache.hadoop.util.Progressable; public class HoodieWrapperFileSystem extends FileSystem { public static final String HOODIE_SCHEME_PREFIX = "hoodie-"; - private static final Set SUPPORT_SCHEMES; - - static { - SUPPORT_SCHEMES = new HashSet<>(); - SUPPORT_SCHEMES.add("file"); - SUPPORT_SCHEMES.add("hdfs"); - SUPPORT_SCHEMES.add("s3"); - SUPPORT_SCHEMES.add("s3a"); - - // Hoodie currently relies on underlying object store being fully - // consistent so only regional buckets should be used. - SUPPORT_SCHEMES.add("gs"); - SUPPORT_SCHEMES.add("viewfs"); - } private ConcurrentMap openStreams = new ConcurrentHashMap<>(); @@ -104,7 +89,7 @@ public class HoodieWrapperFileSystem extends FileSystem { public static String getHoodieScheme(String scheme) { String newScheme; - if (SUPPORT_SCHEMES.contains(scheme)) { + if (StorageSchemes.isSchemeSupported(scheme)) { newScheme = HOODIE_SCHEME_PREFIX + scheme; } else { throw new IllegalArgumentException( diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java new file mode 100644 index 000000000..f42238ed6 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.storage; + +import java.util.Arrays; + +/** + * All the supported storage schemes in Hoodie. + */ +public enum StorageSchemes { + // Local filesystem + FILE("file", false), + // Hadoop File System + HDFS("hdfs", true), + // Apache Ignite FS + IGNITE("igfs", true), + // AWS S3 + S3A("s3a", false), + S3("s3", false), + // Google Cloud Storage + GCS("gs", false), + // View FS for federated setups. If federating across cloud stores, then append support is false + VIEWFS("viewfs", true); + + private String scheme; + private boolean supportsAppend; + + StorageSchemes(String scheme, boolean supportsAppend) { + this.scheme = scheme; + this.supportsAppend = supportsAppend; + } + + public String getScheme() { + return scheme; + } + + public boolean supportsAppend() { + return supportsAppend; + } + + public static boolean isSchemeSupported(String scheme) { + return Arrays.stream(values()).filter(s -> s.getScheme().equals(scheme)).count() > 0; + } + + public static boolean isAppendSupported(String scheme) { + if (!isSchemeSupported(scheme)) { + throw new IllegalArgumentException("Unsupported scheme :" + scheme); + } + return Arrays.stream(StorageSchemes.values()) + .filter(s -> s.supportsAppend() && s.scheme.equals(scheme)).count() > 0; + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java index c75e666bd..28b2501d0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common.table.log; import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.storage.StorageSchemes; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; @@ -68,26 +69,32 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { Path path = logFile.getPath(); if (fs.exists(path)) { - log.info(logFile + " exists. Appending to existing file"); - try { - this.output = fs.append(path, bufferSize); - } catch (RemoteException e) { - log.warn("Remote Exception, attempting to handle or recover lease", e); - handleAppendExceptionOrRecoverLease(path, e); - } catch (IOException ioe) { - if (ioe.getMessage().equalsIgnoreCase("Not supported")) { - log.info("Append not supported. Opening a new log file.."); - this.logFile = logFile.rollOver(fs); - createNewFile(); - } else { - throw ioe; + boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); + if (isAppendSupported) { + log.info(logFile + " exists. Appending to existing file"); + try { + this.output = fs.append(path, bufferSize); + } catch (RemoteException e) { + log.warn("Remote Exception, attempting to handle or recover lease", e); + handleAppendExceptionOrRecoverLease(path, e); + } catch (IOException ioe) { + if (ioe.getMessage().toLowerCase().contains("not supported")) { + // may still happen if scheme is viewfs. + isAppendSupported = false; + } else { + throw ioe; + } } } + if (!isAppendSupported) { + this.logFile = logFile.rollOver(fs); + log.info("Append not supported.. Rolling over to " + logFile); + createNewFile(); + } } else { log.info(logFile + " does not exist. Create a new file"); // Block size does not matter as we will always manually autoflush createNewFile(); - // TODO - append a file level meta block } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/storage/TestStorageSchemes.java b/hoodie-common/src/test/java/com/uber/hoodie/common/storage/TestStorageSchemes.java new file mode 100644 index 000000000..0b8a04914 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/storage/TestStorageSchemes.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.storage; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Test; + +public class TestStorageSchemes { + + @Test + public void testStorageSchemes() { + assertTrue(StorageSchemes.isSchemeSupported("hdfs")); + assertFalse(StorageSchemes.isSchemeSupported("s2")); + assertFalse(StorageSchemes.isAppendSupported("s3a")); + assertFalse(StorageSchemes.isAppendSupported("gs")); + assertTrue(StorageSchemes.isAppendSupported("viewfs")); + try { + StorageSchemes.isAppendSupported("s2"); + fail("Should throw exception for unsupported schemes"); + } catch (IllegalArgumentException ignore) { + // expected. + } + } +}