diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java index ee1dcf559..fa17711ba 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java @@ -46,6 +46,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private final Integer bufferSize; private final Short replication; private FSDataOutputStream output; + private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; /** * @param fs @@ -69,6 +70,18 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { try { this.output = fs.append(path, bufferSize); } catch (RemoteException e) { + if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) { + // This issue happens when all replicas for a file are down and/or being decommissioned. + // The fs.append() API could append to the last block for a file. If the last block is full, a new block is + // appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all + // replicas for a block/file are decommissioned together. During this process, all these blocks will start to + // get replicated to other active DataNodes but this process might take time (can be of the order of few + // hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be + // appended to, then the NN will throw an exception saying that it couldn't find any active replica with the + // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325 + log.warn("Failed to open an append stream to the log file. Opening a new log file..", e); + createNewFile(); + } // this happens when either another task executor writing to this file died or // data node is going down if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName()) @@ -86,9 +99,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { } catch (IOException ioe) { if (ioe.getMessage().equalsIgnoreCase("Not supported")) { log.info("Append not supported. Opening a new log file.."); - this.logFile = logFile.rollOver(fs); - this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication, - WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + createNewFile(); } else { throw ioe; } @@ -192,6 +203,12 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { return this; } + private void createNewFile() throws IOException { + this.logFile = logFile.rollOver(fs); + this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication, + WriterBuilder.DEFAULT_SIZE_THRESHOLD, null); + } + @Override public void close() throws IOException { flush(); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatAppendFailureTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatAppendFailureTest.java new file mode 100644 index 000000000..76a03a347 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatAppendFailureTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.table.log; + +import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; + +import com.google.common.collect.Maps; +import com.uber.hoodie.common.minicluster.MiniClusterUtil; +import com.uber.hoodie.common.model.HoodieArchivedLogFile; +import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.SchemaTestUtil; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This class is intentionally using a different way of setting up the MiniDFSCluster and not relying on + * {@link MiniClusterUtil} to reproduce append() issue : https://issues.apache.org/jira/browse/HDFS-6325 + * Reference : https://issues.apache.org/jira/secure/attachment/12645053/HDFS-6325.patch + */ +public class HoodieLogFormatAppendFailureTest { + + private static File baseDir; + private static MiniDFSCluster cluster; + + @BeforeClass + public static void setUpClass() throws IOException { + // NOTE : The MiniClusterDFS leaves behind the directory under which the cluster was created + baseDir = new File("/tmp/" + UUID.randomUUID().toString()); + FileUtil.fullyDelete(baseDir); + // Append is not supported in LocalFileSystem. HDFS needs to be setup. + Configuration conf = new Configuration(); + // lower heartbeat interval for fast recognition of DN + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000); + cluster = new MiniDFSCluster.Builder(conf).checkExitOnShutdown(true).numDataNodes(4).build(); + } + + @AfterClass + public static void tearDownClass() { + cluster.shutdown(true); + // Force clean up the directory under which the cluster was created + FileUtil.fullyDelete(baseDir); + } + + @Test(timeout = 60000) + public void testFailedToGetAppendStreamFromHDFSNameNode() throws IOException, URISyntaxException, + InterruptedException, TimeoutException { + + // Use some fs like LocalFileSystem, that does not support appends + String uuid = UUID.randomUUID().toString(); + Path localPartitionPath = new Path("/tmp/"); + FileSystem fs = cluster.getFileSystem(); + Path testPath = new Path(localPartitionPath, uuid); + fs.mkdirs(testPath); + + // Some data & append. + List records = SchemaTestUtil.generateTestRecords(0, 10); + Map header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); + + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) + .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits" + + ".archive").overBaseCommit("") + .withFs(fs).build(); + + writer = writer.appendBlock(dataBlock); + // get the current log file version to compare later + int logFileVersion = writer.getLogFile().getLogVersion(); + Path logFilePath = writer.getLogFile().getPath(); + writer.close(); + + // Wait for 3 times replication of file + DFSTestUtil.waitReplication(fs, logFilePath, (short) 3); + // Shut down all DNs that have the last block location for the file + LocatedBlocks lbs = cluster.getFileSystem().getClient().getNamenode() + .getBlockLocations("/tmp/" + uuid + "/" + logFilePath.getName(), 0, Long.MAX_VALUE); + List dnsOfCluster = cluster.getDataNodes(); + DatanodeInfo[] dnsWithLocations = lbs.getLastLocatedBlock().getLocations(); + for (DataNode dn : dnsOfCluster) { + for (DatanodeInfo loc : dnsWithLocations) { + if (dn.getDatanodeId().equals(loc)) { + dn.shutdown(); + cluster.stopDataNode(dn.getDisplayName()); + DFSTestUtil.waitForDatanodeDeath(dn); + } + } + } + // Wait for the replication of this file to go down to 0 + DFSTestUtil.waitReplication(fs, logFilePath, (short) 0); + + // Opening a new Writer right now will throw IOException. The code should handle this, rollover the logfile and + // return a new writer with a bumped up logVersion + writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) + .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits" + ".archive") + .overBaseCommit("") + .withFs(fs).build(); + // The log version should be different for this new writer + Assert.assertFalse(writer.getLogFile().getLogVersion() == logFileVersion); + } + +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java index c62aa1cc4..bb385dc1a 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java @@ -30,7 +30,6 @@ import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.common.util.SpillableMapTestUtils; import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; import com.uber.hoodie.common.util.collection.converter.StringConverter; -import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URISyntaxException; @@ -182,12 +181,6 @@ public class TestExternalSpillableMap { } } - @Test - public void simpleTestWithExceptionValidateFileIsRemoved() throws Exception { - File file = new File(FAILURE_OUTPUT_PATH); - assertFalse(file.exists()); - } - @Test public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException {