From 1b27259b530225e3c76fda684f888cad78045d3c Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sun, 25 Apr 2021 23:06:53 +0800 Subject: [PATCH] [HUDI-1844] Add option to flush when total buckets memory exceeds the threshold (#2877) Current code supports flushing as per-bucket memory usage, while the buckets may still take too much memory for bootstrap from history data. When the threshold hits, flush out half of the buckets with bigger buffer size. --- .../hudi/configuration/FlinkOptions.java | 12 ++- .../apache/hudi/sink/StreamWriteFunction.java | 86 ++++++++++++++++--- .../hudi/sink/TestWriteCopyOnWrite.java | 64 +++++++++++++- .../hudi/table/HoodieDataSourceITCase.java | 2 +- 4 files changed, 147 insertions(+), 17 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3a942affb..de9b950f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -267,11 +267,17 @@ public class FlinkOptions { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); - public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions - .key("write.batch.size.MB") + public static final ConfigOption WRITE_BUFFER_SIZE = ConfigOptions + .key("write.buffer.size.MB") + .doubleType() + .defaultValue(256D) // 256MB + .withDescription("Total buffer size in MB to flush data into the underneath filesystem, default 256MB"); + + public static final ConfigOption WRITE_BUCKET_SIZE = ConfigOptions + .key("write.bucket.size.MB") .doubleType() .defaultValue(64D) // 64MB - .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); + .withDescription("Bucket size in MB to flush data into the underneath filesystem, default 64MB"); public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions .key("write.log_block.size.MB") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8244226b3..36bd0edae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -41,18 +41,21 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Sink function to write the data to the underneath filesystem. @@ -60,7 +63,7 @@ import java.util.function.BiFunction; *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BUCKET_SIZE} * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * @@ -98,6 +101,11 @@ public class StreamWriteFunction private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); + /** + * Write buffer size detector. + */ + private transient BufferSizeDetector detector; + /** * Write buffer as buckets for a checkpoint. The key is bucket ID. */ @@ -223,6 +231,7 @@ public class StreamWriteFunction // ------------------------------------------------------------------------- private void initBuffer() { + this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE)); this.buckets = new LinkedHashMap<>(); } @@ -249,18 +258,49 @@ public class StreamWriteFunction /** * Data bucket. */ - private static class DataBucket { + private static class DataBucket implements Comparable { private final List records; - private final BufferSizeDetector detector; + private final BucketSizeTracer tracer; private DataBucket(Double batchSize) { this.records = new ArrayList<>(); - this.detector = new BufferSizeDetector(batchSize); + this.tracer = new BucketSizeTracer(batchSize); } public void reset() { this.records.clear(); - this.detector.reset(); + this.tracer.reset(); + } + + @Override + public int compareTo(@NotNull DataBucket other) { + return Double.compare(tracer.threshold, other.tracer.threshold); + } + } + + /** + * Tool to detect if to flush out the existing bucket. + */ + private static class BucketSizeTracer { + private final double threshold; + + private long totalSize = 0L; + + BucketSizeTracer(double bucketSizeMb) { + this.threshold = bucketSizeMb * 1024 * 1024; + } + + /** + * Trace the bucket size with given record size, + * returns true if the bucket size exceeds specified threshold. + */ + boolean trace(long recordSize) { + totalSize += recordSize; + return totalSize > this.threshold; + } + + void reset() { + this.totalSize = 0L; } } @@ -272,13 +312,13 @@ public class StreamWriteFunction private final Random random = new Random(47); private static final int DENOMINATOR = 100; - private final double batchSizeBytes; + private final double threshold; private long lastRecordSize = -1L; private long totalSize = 0L; BufferSizeDetector(double batchSizeMb) { - this.batchSizeBytes = batchSizeMb * 1024 * 1024; + this.threshold = batchSizeMb * 1024 * 1024; } boolean detect(Object record) { @@ -286,7 +326,7 @@ public class StreamWriteFunction lastRecordSize = ObjectSizeCalculator.getObjectSize(record); } totalSize += lastRecordSize; - return totalSize > this.batchSizeBytes; + return totalSize > this.threshold; } boolean sampling() { @@ -298,6 +338,10 @@ public class StreamWriteFunction this.lastRecordSize = -1L; this.totalSize = 0L; } + + public void countDown(long bucketSize) { + this.totalSize -= bucketSize; + } } /** @@ -313,17 +357,34 @@ public class StreamWriteFunction * Buffers the given record. * *

Flush the data bucket first if the bucket records size is greater than - * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}. * * @param value HoodieRecord */ private void bufferRecord(I value) { + boolean flushBuffer = detector.detect(value); + if (flushBuffer) { + List sortedBuckets = this.buckets.values().stream() + .sorted(Comparator.comparingDouble(b -> b.tracer.totalSize)) + .collect(Collectors.toList()); + // flush half number of buckets to avoid flushing too small buckets + // which cause small files. + int numBucketsToFlush = (sortedBuckets.size() + 1) / 2; + LOG.info("Flush {} data buckets because the total buffer size [{} bytes] exceeds the threshold [{} bytes]", + numBucketsToFlush, detector.totalSize, detector.threshold); + for (int i = 0; i < numBucketsToFlush; i++) { + DataBucket bucket = sortedBuckets.get(i); + flushBucket(bucket); + detector.countDown(bucket.tracer.totalSize); + bucket.reset(); + } + } final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); - boolean needFlush = bucket.detector.detect(value); - if (needFlush) { + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE))); + boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize); + if (flushBucket) { flushBucket(bucket); bucket.reset(); } @@ -390,6 +451,7 @@ public class StreamWriteFunction .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); + this.detector.reset(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 9e417e33f..3d1461b41 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -378,7 +378,7 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size + conf.setDouble(FlinkOptions.WRITE_BUCKET_SIZE, 0.001); // 1Kb batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -436,6 +436,68 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } + @Test + public void testInsertWithSmallBuffer() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BUFFER_SIZE, 0.001); // 1Kb buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // each record is 424 bytes. so 3 records expect to trigger buffer flush: + // flush half of the buckets once at a time. + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("4 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(1)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + for (int i = 0; i < 3; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map expected = getMiniBatchExpected(); + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + for (int i = 0; i < 3; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + Map getMiniBatchExpected() { Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index fe652c5ff..2f2dcb26b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -330,7 +330,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); + options.put(FlinkOptions.WRITE_BUCKET_SIZE.key(), "0.001"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL);