diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java index fd41a1680..ecb18c6bc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.avro.generic.IndexedRecord; @@ -38,6 +39,8 @@ import scala.Tuple2; import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -97,4 +100,51 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness { } } } + + @Test + public void testInterruptExecutor() { + final List hoodieRecords = dataGen.generateInserts(instantTime, 100); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); + BoundedInMemoryQueueConsumer, Integer> consumer = + new BoundedInMemoryQueueConsumer, Integer>() { + + @Override + protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + try { + while (true) { + Thread.sleep(1000); + } + } catch (InterruptedException ie) { + return; + } + } + + @Override + protected void finish() { + } + + @Override + protected Integer getResult() { + return 0; + } + }; + + SparkBoundedInMemoryExecutor>, Integer> executor = null; + try { + executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + SparkBoundedInMemoryExecutor>, Integer> finalExecutor = executor; + + Thread.currentThread().interrupt(); + + assertThrows(HoodieException.class, () -> finalExecutor.execute()); + assertTrue(Thread.interrupted()); + } finally { + if (executor != null) { + executor.shutdownNow(); + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index cec9ab61a..95a501f6e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -139,6 +139,10 @@ public class BoundedInMemoryExecutor { Future future = startConsumer(); // Wait for consumer to be done return future.get(); + } catch (InterruptedException ie) { + shutdownNow(); + Thread.currentThread().interrupt(); + throw new HoodieException(ie); } catch (Exception e) { throw new HoodieException(e); }