newRecordKeysSorted = new PriorityQueue<>();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
index 9ab44d0f6..ebbc7a5c2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java
@@ -28,11 +28,14 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+
/**
* A HoodieCreateHandle which writes all data into a single file.
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
+@NotThreadSafe
public class HoodieUnboundedCreateHandle extends HoodieCreateHandle {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index 5b3c69ddf..095cacc14 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -30,13 +30,18 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import javax.annotation.concurrent.NotThreadSafe;
+
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the canWrite()
+ *
+ * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
+@NotThreadSafe
public class HoodieParquetWriter
extends ParquetWriter implements HoodieFileWriter {
@@ -106,4 +111,9 @@ public class HoodieParquetWriter extends
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 78b3cb1dc..2377ea6ff 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -66,6 +66,7 @@ public class FlinkLazyInsertIterable extends Hood
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 38d4e60f6..313126552 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -102,13 +102,16 @@ public class FlinkMergeHelper extends BaseMergeHe
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index f91dd5019..9821aedc8 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -74,6 +74,7 @@ public class JavaLazyInsertIterable extends Hoodi
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 7878d8577..46dd30a7c 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -102,13 +102,16 @@ public class JavaMergeHelper extends BaseMergeHel
} catch (Exception e) {
throw new HoodieException(e);
} finally {
+ // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
+ // and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
}
- mergeHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ mergeHandle.close();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index a8a9e49c0..df5bd2d3f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -95,6 +95,7 @@ public class SparkLazyInsertIterable extends Hood
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
+ bufferedIteratorExecutor.awaitTermination();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index e3d0e9b3c..96ac794dc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -80,10 +80,11 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index d07ea771b..5f45629ba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -68,9 +68,9 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
void executeBootstrap(HoodieBootstrapHandle, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor wrapper = null;
+ ParquetReader reader =
+ AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try {
- ParquetReader reader =
- AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
@@ -84,10 +84,12 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
} catch (Exception e) {
throw new HoodieException(e);
} finally {
- bootstrapHandle.close();
+ reader.close();
if (null != wrapper) {
wrapper.shutdownNow();
+ wrapper.awaitTermination();
}
+ bootstrapHandle.close();
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 91f9cbc96..a714d60d0 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Iterator;
import java.util.List;
import scala.Tuple2;
@@ -105,6 +107,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
@@ -152,7 +155,49 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
} finally {
if (executor != null) {
executor.shutdownNow();
+ executor.awaitTermination();
}
}
}
+
+ @Test
+ public void testExecutorTermination() {
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+ when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
+ Iterator unboundedRecordIter = new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public GenericRecord next() {
+ return dataGen.generateGenericRecord();
+ }
+ };
+
+ BoundedInMemoryQueueConsumer, Integer> consumer =
+ new BoundedInMemoryQueueConsumer, Integer>() {
+ @Override
+ protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) {
+ }
+
+ @Override
+ protected void finish() {
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
+
+ BoundedInMemoryExecutor>, Integer> executor =
+ new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
+ consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ getPreExecuteRunnable());
+ executor.shutdownNow();
+ boolean terminatedGracefully = executor.awaitTermination();
+ assertTrue(terminatedGracefully);
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index d1e5e6608..46ef5dc40 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,7 +49,7 @@ import java.util.stream.Collectors;
public class BoundedInMemoryExecutor {
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
-
+ private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread.
private final ExecutorService producerExecutorService;
// Executor service used for launching read thread.
@@ -168,6 +169,27 @@ public class BoundedInMemoryExecutor {
public void shutdownNow() {
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
+ // close queue to force producer stop
+ queue.close();
+ }
+
+ public boolean awaitTermination() {
+ // if current thread has been interrupted before awaitTermination was called, we still give
+ // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
+ boolean interruptedBefore = Thread.interrupted();
+ boolean producerTerminated = false;
+ boolean consumerTerminated = false;
+ try {
+ producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // fail silently for any other interruption
+ }
+ // reset interrupt flag if needed
+ if (interruptedBefore) {
+ Thread.currentThread().interrupt();
+ }
+ return producerTerminated && consumerTerminated;
}
public BoundedInMemoryQueue getQueue() {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index cb4f55707..e05d5f6f3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -860,12 +860,14 @@ public class HoodieTestDataGenerator implements AutoCloseable {
return false;
}
+ public GenericRecord generateGenericRecord() {
+ return generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
+ genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong());
+ }
+
public List generateGenericRecords(int numRecords) {
List list = new ArrayList<>();
- IntStream.range(0, numRecords).forEach(i -> {
- list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
- genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
- });
+ IntStream.range(0, numRecords).forEach(i -> list.add(generateGenericRecord()));
return list;
}