diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java new file mode 100644 index 000000000..738be514b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.jetbrains.annotations.NotNull; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A thread factory for creation of threads + */ +public class CustomizedThreadFactory implements ThreadFactory { + + private static final AtomicLong POOL_NUM = new AtomicLong(1); + private final AtomicLong threadNum = new AtomicLong(1); + + private final String threadName; + private final boolean daemon; + + public CustomizedThreadFactory() { + this("pool-" + POOL_NUM.getAndIncrement(), false); + } + + public CustomizedThreadFactory(String threadNamePrefix) { + this(threadNamePrefix, false); + } + + public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) { + this.threadName = threadNamePrefix + "-thread-"; + this.daemon = daemon; + } + + @Override + public Thread newThread(@NotNull Runnable r) { + Thread runThread = new Thread(r); + runThread.setDaemon(daemon); + runThread.setName(threadName + threadNum.getAndIncrement()); + return runThread; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index 68b840a47..d1e5e6608 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.queue; +import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; @@ -48,8 +49,10 @@ public class BoundedInMemoryExecutor { private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class); - // Executor service used for launching writer thread. - private final ExecutorService executorService; + // Executor service used for launching write thread. + private final ExecutorService producerExecutorService; + // Executor service used for launching read thread. + private final ExecutorService consumerExecutorService; // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. private final BoundedInMemoryQueue queue; // Producers @@ -60,28 +63,30 @@ public class BoundedInMemoryExecutor { private final Runnable preExecuteRunnable; public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator inputItr, - BoundedInMemoryQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { + BoundedInMemoryQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, - Option> consumer, final Function transformFunction) { + Option> consumer, final Function transformFunction) { this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, - Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { + Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); } public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers, - Option> consumer, final Function transformFunction, - final SizeEstimator sizeEstimator, Runnable preExecuteRunnable) { + Option> consumer, final Function transformFunction, + final SizeEstimator sizeEstimator, Runnable preExecuteRunnable) { this.producers = producers; this.consumer = consumer; this.preExecuteRunnable = preExecuteRunnable; - // Ensure single thread for each producer thread and one for consumer - this.executorService = Executors.newFixedThreadPool(producers.size() + 1); + // Ensure fixed thread for each producer thread + this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer")); + // Ensure single thread for consumer + this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer")); this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator); } @@ -92,7 +97,7 @@ public class BoundedInMemoryExecutor { // Latch to control when and which producer thread will close the queue final CountDownLatch latch = new CountDownLatch(producers.size()); final ExecutorCompletionService completionService = - new ExecutorCompletionService(executorService); + new ExecutorCompletionService(producerExecutorService); producers.stream().map(producer -> { return completionService.submit(() -> { try { @@ -122,7 +127,7 @@ public class BoundedInMemoryExecutor { */ private Future startConsumer() { return consumer.map(consumer -> { - return executorService.submit(() -> { + return consumerExecutorService.submit(() -> { LOG.info("starting consumer thread"); preExecuteRunnable.run(); try { @@ -143,7 +148,7 @@ public class BoundedInMemoryExecutor { */ public E execute() { try { - ExecutorCompletionService producerService = startProducers(); + startProducers(); Future future = startConsumer(); // Wait for consumer to be done return future.get(); @@ -161,7 +166,8 @@ public class BoundedInMemoryExecutor { } public void shutdownNow() { - executorService.shutdownNow(); + producerExecutorService.shutdownNow(); + consumerExecutorService.shutdownNow(); } public BoundedInMemoryQueue getQueue() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java new file mode 100644 index 000000000..797643a2c --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.locks.LockSupport; + +public class TestCustomizedThreadFactory { + + @Test + public void testThreadPrefix() throws ExecutionException, InterruptedException { + int nThreads = 100; + String threadNamePrefix = "consumer"; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix)); + for (int i = 0; i < nThreads; i++) { + Future resultFuture = executorService.submit(() -> { + LockSupport.parkNanos(10000000L); + String name = Thread.currentThread().getName(); + return name.startsWith(threadNamePrefix); + }); + Boolean result = resultFuture.get(); + Assertions.assertTrue(result); + } + } + + @Test + public void testDefaultThreadPrefix() throws ExecutionException, InterruptedException { + int nThreads = 100; + String defaultThreadNamePrefix = "pool-1"; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory()); + for (int i = 0; i < nThreads; i++) { + Future resultFuture = executorService.submit(() -> { + LockSupport.parkNanos(10000000L); + String name = Thread.currentThread().getName(); + return name.startsWith(defaultThreadNamePrefix); + }); + Boolean result = resultFuture.get(); + Assertions.assertTrue(result); + } + } + + @Test + public void testDaemonThread() throws ExecutionException, InterruptedException { + int nThreads = 100; + String threadNamePrefix = "consumer"; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix, true)); + for (int i = 0; i < nThreads; i++) { + Future resultFuture = executorService.submit(() -> { + LockSupport.parkNanos(10000000L); + String name = Thread.currentThread().getName(); + boolean daemon = Thread.currentThread().isDaemon(); + return name.startsWith(threadNamePrefix) && daemon; + }); + Boolean result = resultFuture.get(); + Assertions.assertTrue(result); + } + } +}