[HUDI-2515] Add close when producing records failed (#3746)
This commit is contained in:
@@ -19,7 +19,7 @@
|
|||||||
package org.apache.hudi.common.util;
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
|
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.parquet.hadoop.ParquetReader;
|
import org.apache.parquet.hadoop.ParquetReader;
|
||||||
|
|
||||||
@@ -49,8 +49,9 @@ public class ParquetReaderIterator<T> implements Iterator<T> {
|
|||||||
this.next = parquetReader.read();
|
this.next = parquetReader.read();
|
||||||
}
|
}
|
||||||
return this.next != null;
|
return this.next != null;
|
||||||
} catch (IOException io) {
|
} catch (Exception e) {
|
||||||
throw new HoodieIOException("unable to read next record from parquet file ", io);
|
FileIOUtils.closeQuietly(parquetReader);
|
||||||
|
throw new HoodieException("unable to read next record from parquet file ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,14 +61,15 @@ public class ParquetReaderIterator<T> implements Iterator<T> {
|
|||||||
// To handle case when next() is called before hasNext()
|
// To handle case when next() is called before hasNext()
|
||||||
if (this.next == null) {
|
if (this.next == null) {
|
||||||
if (!hasNext()) {
|
if (!hasNext()) {
|
||||||
throw new HoodieIOException("No more records left to read from parquet file");
|
throw new HoodieException("No more records left to read from parquet file");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
T retVal = this.next;
|
T retVal = this.next;
|
||||||
this.next = parquetReader.read();
|
this.next = parquetReader.read();
|
||||||
return retVal;
|
return retVal;
|
||||||
} catch (IOException io) {
|
} catch (Exception e) {
|
||||||
throw new HoodieIOException("unable to read next record from parquet file ", io);
|
FileIOUtils.closeQuietly(parquetReader);
|
||||||
|
throw new HoodieException("unable to read next record from parquet file ", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
/**
|
/**
|
||||||
* Inserts record into queue after applying transformation.
|
* Inserts record into queue after applying transformation.
|
||||||
*
|
*
|
||||||
* @param t Item to be queueed
|
* @param t Item to be queued
|
||||||
*/
|
*/
|
||||||
public void insertRecord(I t) throws Exception {
|
public void insertRecord(I t) throws Exception {
|
||||||
// If already closed, throw exception
|
// If already closed, throw exception
|
||||||
@@ -222,7 +222,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check one more time here as it is possible producer errored out and closed immediately
|
// Check one more time here as it is possible producer erred out and closed immediately
|
||||||
throwExceptionIfFailed();
|
throwExceptionIfFailed();
|
||||||
|
|
||||||
if (newRecord != null && newRecord.isPresent()) {
|
if (newRecord != null && newRecord.isPresent()) {
|
||||||
@@ -244,6 +244,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
|
|
||||||
private void throwExceptionIfFailed() {
|
private void throwExceptionIfFailed() {
|
||||||
if (this.hasFailed.get() != null) {
|
if (this.hasFailed.get() != null) {
|
||||||
|
close();
|
||||||
throw new HoodieException("operation has failed", this.hasFailed.get());
|
throw new HoodieException("operation has failed", this.hasFailed.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.util;
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.parquet.hadoop.ParquetReader;
|
import org.apache.parquet.hadoop.ParquetReader;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@@ -30,6 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestParquetReaderIterator {
|
public class TestParquetReaderIterator {
|
||||||
@@ -59,6 +61,7 @@ public class TestParquetReaderIterator {
|
|||||||
assertEquals(1, iterator.next());
|
assertEquals(1, iterator.next());
|
||||||
// no more entries to iterate on
|
// no more entries to iterate on
|
||||||
assertFalse(iterator.hasNext());
|
assertFalse(iterator.hasNext());
|
||||||
assertThrows(HoodieIOException.class, iterator::next, "should throw an exception since there is only 1 record");
|
assertThrows(HoodieException.class, iterator::next, "should throw an exception since there is only 1 record");
|
||||||
|
verify(reader, times(1)).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user