1
0

[HUDI-2875] Make HoodieParquetWriter Thread safe and memory executor exit gracefully (#4264)

This commit is contained in:
guanziyue
2022-05-06 04:49:34 +08:00
committed by GitHub
parent d794f4fbf9
commit abb4893b25
17 changed files with 121 additions and 12 deletions

View File

@@ -35,6 +35,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
@@ -66,6 +68,7 @@ import java.util.Map;
* Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
* happen and every batch should have new records to be inserted. Above example is for illustration purposes only. * happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
*/ */
@NotThreadSafe
public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> { public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class); private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);

View File

@@ -42,12 +42,15 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@NotThreadSafe
public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> { public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);

View File

@@ -54,6 +54,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@@ -91,6 +93,7 @@ import java.util.Set;
* *
* </p> * </p>
*/ */
@NotThreadSafe
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> { public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);

View File

@@ -32,6 +32,8 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@@ -45,6 +47,7 @@ import java.util.Queue;
* The implementation performs a merge-sort by comparing the key of the record being written to the list of * The implementation performs a merge-sort by comparing the key of the record being written to the list of
* keys in newRecordKeys (sorted in-memory). * keys in newRecordKeys (sorted in-memory).
*/ */
@NotThreadSafe
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> { public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
private final Queue<String> newRecordKeysSorted = new PriorityQueue<>(); private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();

View File

@@ -28,11 +28,14 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe;
/** /**
* A HoodieCreateHandle which writes all data into a single file. * A HoodieCreateHandle which writes all data into a single file.
* <p> * <p>
* Please use this with caution. This can end up creating very large files if not used correctly. * Please use this with caution. This can end up creating very large files if not used correctly.
*/ */
@NotThreadSafe
public class HoodieUnboundedCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> { public class HoodieUnboundedCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class); private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);

View File

@@ -30,13 +30,18 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.ParquetWriter;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if * HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
* the current file can take more records with the <code>canWrite()</code> * the current file can take more records with the <code>canWrite()</code>
*
* ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/ */
@NotThreadSafe
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord> public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> { extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
@@ -106,4 +111,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
writeSupport.add(key); writeSupport.add(key);
} }
} }
@Override
public void close() throws IOException {
super.close();
}
} }

View File

@@ -148,13 +148,16 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
} finally { } finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
mergeHandle.close();
if (null != wrapper) { if (null != wrapper) {
wrapper.shutdownNow(); wrapper.shutdownNow();
wrapper.awaitTermination();
} }
mergeHandle.close();
} }
} }
} }

View File

@@ -66,6 +66,7 @@ public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
} finally { } finally {
if (null != bufferedIteratorExecutor) { if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow(); bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
} }
} }
} }

View File

@@ -102,13 +102,16 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
} finally { } finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
mergeHandle.close();
if (null != wrapper) { if (null != wrapper) {
wrapper.shutdownNow(); wrapper.shutdownNow();
wrapper.awaitTermination();
} }
mergeHandle.close();
} }
} }
} }

View File

@@ -74,6 +74,7 @@ public class JavaLazyInsertIterable<T extends HoodieRecordPayload> extends Hoodi
} finally { } finally {
if (null != bufferedIteratorExecutor) { if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow(); bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
} }
} }
} }

View File

@@ -102,13 +102,16 @@ public class JavaMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHel
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
} finally { } finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) { if (reader != null) {
reader.close(); reader.close();
} }
mergeHandle.close();
if (null != wrapper) { if (null != wrapper) {
wrapper.shutdownNow(); wrapper.shutdownNow();
wrapper.awaitTermination();
} }
mergeHandle.close();
} }
} }

View File

@@ -95,6 +95,7 @@ public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
} finally { } finally {
if (null != bufferedIteratorExecutor) { if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow(); bufferedIteratorExecutor.shutdownNow();
bufferedIteratorExecutor.awaitTermination();
} }
} }
} }

View File

@@ -80,10 +80,11 @@ class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
} finally { } finally {
bootstrapHandle.close();
if (null != wrapper) { if (null != wrapper) {
wrapper.shutdownNow(); wrapper.shutdownNow();
wrapper.awaitTermination();
} }
bootstrapHandle.close();
} }
} }
} }

View File

@@ -68,9 +68,9 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception { Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null; BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
try { try {
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(), wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> { new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey(); String recKey = keyGenerator.getKey(inp).getRecordKey();
@@ -84,10 +84,12 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
} finally { } finally {
bootstrapHandle.close(); reader.close();
if (null != wrapper) { if (null != wrapper) {
wrapper.shutdownNow(); wrapper.shutdownNow();
wrapper.awaitTermination();
} }
bootstrapHandle.close();
} }
} }
} }

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext; import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$; import org.apache.spark.TaskContext$;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Iterator;
import java.util.List; import java.util.List;
import scala.Tuple2; import scala.Tuple2;
@@ -105,6 +107,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
} finally { } finally {
if (executor != null) { if (executor != null) {
executor.shutdownNow(); executor.shutdownNow();
executor.awaitTermination();
} }
} }
} }
@@ -152,7 +155,49 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
} finally { } finally {
if (executor != null) { if (executor != null) {
executor.shutdownNow(); executor.shutdownNow();
executor.awaitTermination();
} }
} }
} }
@Test
public void testExecutorTermination() {
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public GenericRecord next() {
return dataGen.generateGenericRecord();
}
};
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
new BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
@Override
protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
}
@Override
protected void finish() {
}
@Override
protected Integer getResult() {
return 0;
}
};
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
getPreExecuteRunnable());
executor.shutdownNow();
boolean terminatedGracefully = executor.awaitTermination();
assertTrue(terminatedGracefully);
}
} }

View File

@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -48,7 +49,7 @@ import java.util.stream.Collectors;
public class BoundedInMemoryExecutor<I, O, E> { public class BoundedInMemoryExecutor<I, O, E> {
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class); private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
private static final long TERMINATE_WAITING_TIME_SECS = 60L;
// Executor service used for launching write thread. // Executor service used for launching write thread.
private final ExecutorService producerExecutorService; private final ExecutorService producerExecutorService;
// Executor service used for launching read thread. // Executor service used for launching read thread.
@@ -168,6 +169,27 @@ public class BoundedInMemoryExecutor<I, O, E> {
public void shutdownNow() { public void shutdownNow() {
producerExecutorService.shutdownNow(); producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow(); consumerExecutorService.shutdownNow();
// close queue to force producer stop
queue.close();
}
public boolean awaitTermination() {
// if current thread has been interrupted before awaitTermination was called, we still give
// executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return.
boolean interruptedBefore = Thread.interrupted();
boolean producerTerminated = false;
boolean consumerTerminated = false;
try {
producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
// fail silently for any other interruption
}
// reset interrupt flag if needed
if (interruptedBefore) {
Thread.currentThread().interrupt();
}
return producerTerminated && consumerTerminated;
} }
public BoundedInMemoryQueue<I, O> getQueue() { public BoundedInMemoryQueue<I, O> getQueue() {

View File

@@ -860,12 +860,14 @@ public class HoodieTestDataGenerator implements AutoCloseable {
return false; return false;
} }
public GenericRecord generateGenericRecord() {
return generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong());
}
public List<GenericRecord> generateGenericRecords(int numRecords) { public List<GenericRecord> generateGenericRecords(int numRecords) {
List<GenericRecord> list = new ArrayList<>(); List<GenericRecord> list = new ArrayList<>();
IntStream.range(0, numRecords).forEach(i -> { IntStream.range(0, numRecords).forEach(i -> list.add(generateGenericRecord()));
list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
});
return list; return list;
} }