From f239187da8052680bde355efbaa33149d37b4280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Tue, 2 Nov 2021 19:43:20 +0800 Subject: [PATCH] [HUDI-2515] Add close when producing records failed (#3746) --- .../hudi/common/util/ParquetReaderIterator.java | 14 ++++++++------ .../common/util/queue/BoundedInMemoryQueue.java | 5 +++-- .../common/util/TestParquetReaderIterator.java | 7 +++++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 20c79dd78..5970e02d6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -19,7 +19,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; -import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieException; import org.apache.parquet.hadoop.ParquetReader; @@ -49,8 +49,9 @@ public class ParquetReaderIterator implements Iterator { this.next = parquetReader.read(); } return this.next != null; - } catch (IOException io) { - throw new HoodieIOException("unable to read next record from parquet file ", io); + } catch (Exception e) { + FileIOUtils.closeQuietly(parquetReader); + throw new HoodieException("unable to read next record from parquet file ", e); } } @@ -60,14 +61,15 @@ public class ParquetReaderIterator implements Iterator { // To handle case when next() is called before hasNext() if (this.next == null) { if (!hasNext()) { - throw new HoodieIOException("No more records left to read from parquet file"); + throw new HoodieException("No more records left to read from parquet file"); } } T retVal = this.next; this.next = parquetReader.read(); return retVal; - } catch (IOException io) { - throw new HoodieIOException("unable to read next record from parquet file ", io); + } catch (Exception e) { + FileIOUtils.closeQuietly(parquetReader); + throw new HoodieException("unable to read next record from parquet file ", e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index 4d55249d1..dfe33b49e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -172,7 +172,7 @@ public class BoundedInMemoryQueue implements Iterable { /** * Inserts record into queue after applying transformation. * - * @param t Item to be queueed + * @param t Item to be queued */ public void insertRecord(I t) throws Exception { // If already closed, throw exception @@ -222,7 +222,7 @@ public class BoundedInMemoryQueue implements Iterable { throw new HoodieException(e); } } - // Check one more time here as it is possible producer errored out and closed immediately + // Check one more time here as it is possible producer erred out and closed immediately throwExceptionIfFailed(); if (newRecord != null && newRecord.isPresent()) { @@ -244,6 +244,7 @@ public class BoundedInMemoryQueue implements Iterable { private void throwExceptionIfFailed() { if (this.hasFailed.get() != null) { + close(); throw new HoodieException("operation has failed", this.hasFailed.get()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java index 799ed248b..37fead492 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java @@ -18,7 +18,7 @@ package org.apache.hudi.common.util; -import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieException; import org.apache.parquet.hadoop.ParquetReader; import org.junit.jupiter.api.Test; @@ -30,6 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestParquetReaderIterator { @@ -59,6 +61,7 @@ public class TestParquetReaderIterator { assertEquals(1, iterator.next()); // no more entries to iterate on assertFalse(iterator.hasNext()); - assertThrows(HoodieIOException.class, iterator::next, "should throw an exception since there is only 1 record"); + assertThrows(HoodieException.class, iterator::next, "should throw an exception since there is only 1 record"); + verify(reader, times(1)).close(); } }