[HUDI-1844] Add option to flush when total buckets memory exceeds the threshold (#2877)
Current code supports flushing as per-bucket memory usage, while the buckets may still take too much memory for bootstrap from history data. When the threshold hits, flush out half of the buckets with bigger buffer size.
This commit is contained in:
@@ -267,11 +267,17 @@ public class FlinkOptions {
|
||||
.defaultValue(4)
|
||||
.withDescription("Parallelism of tasks that do actual write, default is 4");
|
||||
|
||||
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
|
||||
.key("write.batch.size.MB")
|
||||
public static final ConfigOption<Double> WRITE_BUFFER_SIZE = ConfigOptions
|
||||
.key("write.buffer.size.MB")
|
||||
.doubleType()
|
||||
.defaultValue(256D) // 256MB
|
||||
.withDescription("Total buffer size in MB to flush data into the underneath filesystem, default 256MB");
|
||||
|
||||
public static final ConfigOption<Double> WRITE_BUCKET_SIZE = ConfigOptions
|
||||
.key("write.bucket.size.MB")
|
||||
.doubleType()
|
||||
.defaultValue(64D) // 64MB
|
||||
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
|
||||
.withDescription("Bucket size in MB to flush data into the underneath filesystem, default 64MB");
|
||||
|
||||
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
|
||||
.key("write.log_block.size.MB")
|
||||
|
||||
@@ -41,18 +41,21 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Sink function to write the data to the underneath filesystem.
|
||||
@@ -60,7 +63,7 @@ import java.util.function.BiFunction;
|
||||
* <p><h2>Work Flow</h2>
|
||||
*
|
||||
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
|
||||
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
|
||||
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BUCKET_SIZE}
|
||||
* or a Flink checkpoint starts. After a batch has been written successfully,
|
||||
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
|
||||
*
|
||||
@@ -98,6 +101,11 @@ public class StreamWriteFunction<K, I, O>
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
|
||||
|
||||
/**
|
||||
* Write buffer size detector.
|
||||
*/
|
||||
private transient BufferSizeDetector detector;
|
||||
|
||||
/**
|
||||
* Write buffer as buckets for a checkpoint. The key is bucket ID.
|
||||
*/
|
||||
@@ -223,6 +231,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void initBuffer() {
|
||||
this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE));
|
||||
this.buckets = new LinkedHashMap<>();
|
||||
}
|
||||
|
||||
@@ -249,18 +258,49 @@ public class StreamWriteFunction<K, I, O>
|
||||
/**
|
||||
* Data bucket.
|
||||
*/
|
||||
private static class DataBucket {
|
||||
private static class DataBucket implements Comparable<DataBucket> {
|
||||
private final List<HoodieRecord> records;
|
||||
private final BufferSizeDetector detector;
|
||||
private final BucketSizeTracer tracer;
|
||||
|
||||
private DataBucket(Double batchSize) {
|
||||
this.records = new ArrayList<>();
|
||||
this.detector = new BufferSizeDetector(batchSize);
|
||||
this.tracer = new BucketSizeTracer(batchSize);
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
this.records.clear();
|
||||
this.detector.reset();
|
||||
this.tracer.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull DataBucket other) {
|
||||
return Double.compare(tracer.threshold, other.tracer.threshold);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tool to detect if to flush out the existing bucket.
|
||||
*/
|
||||
private static class BucketSizeTracer {
|
||||
private final double threshold;
|
||||
|
||||
private long totalSize = 0L;
|
||||
|
||||
BucketSizeTracer(double bucketSizeMb) {
|
||||
this.threshold = bucketSizeMb * 1024 * 1024;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace the bucket size with given record size,
|
||||
* returns true if the bucket size exceeds specified threshold.
|
||||
*/
|
||||
boolean trace(long recordSize) {
|
||||
totalSize += recordSize;
|
||||
return totalSize > this.threshold;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
this.totalSize = 0L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,13 +312,13 @@ public class StreamWriteFunction<K, I, O>
|
||||
private final Random random = new Random(47);
|
||||
private static final int DENOMINATOR = 100;
|
||||
|
||||
private final double batchSizeBytes;
|
||||
private final double threshold;
|
||||
|
||||
private long lastRecordSize = -1L;
|
||||
private long totalSize = 0L;
|
||||
|
||||
BufferSizeDetector(double batchSizeMb) {
|
||||
this.batchSizeBytes = batchSizeMb * 1024 * 1024;
|
||||
this.threshold = batchSizeMb * 1024 * 1024;
|
||||
}
|
||||
|
||||
boolean detect(Object record) {
|
||||
@@ -286,7 +326,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
|
||||
}
|
||||
totalSize += lastRecordSize;
|
||||
return totalSize > this.batchSizeBytes;
|
||||
return totalSize > this.threshold;
|
||||
}
|
||||
|
||||
boolean sampling() {
|
||||
@@ -298,6 +338,10 @@ public class StreamWriteFunction<K, I, O>
|
||||
this.lastRecordSize = -1L;
|
||||
this.totalSize = 0L;
|
||||
}
|
||||
|
||||
public void countDown(long bucketSize) {
|
||||
this.totalSize -= bucketSize;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -313,17 +357,34 @@ public class StreamWriteFunction<K, I, O>
|
||||
* Buffers the given record.
|
||||
*
|
||||
* <p>Flush the data bucket first if the bucket records size is greater than
|
||||
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
|
||||
* the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.
|
||||
*
|
||||
* @param value HoodieRecord
|
||||
*/
|
||||
private void bufferRecord(I value) {
|
||||
boolean flushBuffer = detector.detect(value);
|
||||
if (flushBuffer) {
|
||||
List<DataBucket> sortedBuckets = this.buckets.values().stream()
|
||||
.sorted(Comparator.comparingDouble(b -> b.tracer.totalSize))
|
||||
.collect(Collectors.toList());
|
||||
// flush half number of buckets to avoid flushing too small buckets
|
||||
// which cause small files.
|
||||
int numBucketsToFlush = (sortedBuckets.size() + 1) / 2;
|
||||
LOG.info("Flush {} data buckets because the total buffer size [{} bytes] exceeds the threshold [{} bytes]",
|
||||
numBucketsToFlush, detector.totalSize, detector.threshold);
|
||||
for (int i = 0; i < numBucketsToFlush; i++) {
|
||||
DataBucket bucket = sortedBuckets.get(i);
|
||||
flushBucket(bucket);
|
||||
detector.countDown(bucket.tracer.totalSize);
|
||||
bucket.reset();
|
||||
}
|
||||
}
|
||||
final String bucketID = getBucketID(value);
|
||||
|
||||
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
|
||||
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
|
||||
boolean needFlush = bucket.detector.detect(value);
|
||||
if (needFlush) {
|
||||
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE)));
|
||||
boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize);
|
||||
if (flushBucket) {
|
||||
flushBucket(bucket);
|
||||
bucket.reset();
|
||||
}
|
||||
@@ -390,6 +451,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
.build();
|
||||
this.eventGateway.sendEventToCoordinator(event);
|
||||
this.buckets.clear();
|
||||
this.detector.reset();
|
||||
this.currentInstant = "";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -378,7 +378,7 @@ public class TestWriteCopyOnWrite {
|
||||
@Test
|
||||
public void testInsertWithMiniBatches() throws Exception {
|
||||
// reset the config option
|
||||
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
|
||||
conf.setDouble(FlinkOptions.WRITE_BUCKET_SIZE, 0.001); // 1Kb batch size
|
||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||
|
||||
// open the function and ingest data
|
||||
@@ -436,6 +436,68 @@ public class TestWriteCopyOnWrite {
|
||||
checkWrittenData(tempFile, expected, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertWithSmallBuffer() throws Exception {
|
||||
// reset the config option
|
||||
conf.setDouble(FlinkOptions.WRITE_BUFFER_SIZE, 0.001); // 1Kb buffer size
|
||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||
|
||||
// open the function and ingest data
|
||||
funcWrapper.openFunction();
|
||||
// each record is 424 bytes. so 3 records expect to trigger buffer flush:
|
||||
// flush half of the buckets once at a time.
|
||||
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
||||
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
||||
assertThat("4 records expect to flush out as a mini-batch",
|
||||
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
||||
is(1));
|
||||
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.checkpointFunction(1);
|
||||
dataBuffer = funcWrapper.getDataBuffer();
|
||||
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
|
||||
assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class));
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
|
||||
}
|
||||
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
|
||||
|
||||
String instant = funcWrapper.getWriteClient()
|
||||
.getLastPendingInstant(getTableType());
|
||||
|
||||
funcWrapper.checkpointComplete(1);
|
||||
|
||||
Map<String, String> expected = getMiniBatchExpected();
|
||||
checkWrittenData(tempFile, expected, 1);
|
||||
|
||||
// started a new instant already
|
||||
checkInflightInstant(funcWrapper.getWriteClient());
|
||||
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
|
||||
|
||||
// insert duplicates again
|
||||
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
funcWrapper.checkpointFunction(2);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, event);
|
||||
}
|
||||
|
||||
funcWrapper.checkpointComplete(2);
|
||||
|
||||
// Same the original base file content.
|
||||
checkWrittenData(tempFile, expected, 1);
|
||||
}
|
||||
|
||||
Map<String, String> getMiniBatchExpected() {
|
||||
Map<String, String> expected = new HashMap<>();
|
||||
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
|
||||
|
||||
@@ -330,7 +330,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001");
|
||||
options.put(FlinkOptions.WRITE_BUCKET_SIZE.key(), "0.001");
|
||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||
tableEnv.executeSql(hoodieTableDDL);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user