From 6e9c5dd76548911abd346fad61a0dd6d3c623af9 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 29 Apr 2021 20:32:10 +0800 Subject: [PATCH] [HUDI-1863] Add rate limiter to Flink writer to avoid OOM for bootstrap (#2891) --- .../hudi/configuration/FlinkOptions.java | 20 ++-- .../apache/hudi/sink/StreamWriteFunction.java | 102 +++--------------- .../transform/RowDataToHoodieFunction.java | 67 +++++++++++- .../hudi/source/StreamReadOperator.java | 6 +- .../hudi/sink/TestWriteCopyOnWrite.java | 64 +---------- .../TestRowDataToHoodieFunction.java | 83 ++++++++++++++ .../hudi/table/HoodieDataSourceITCase.java | 2 +- 7 files changed, 177 insertions(+), 167 deletions(-) create mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index d132d5322..7752615ac 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -267,20 +267,20 @@ public class FlinkOptions { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); - public static final ConfigOption WRITE_BUFFER_SIZE = ConfigOptions - .key("write.buffer.size.MB") - .doubleType() - .defaultValue(256D) // 256MB - .withDescription("Total buffer size in MB to flush data into the underneath filesystem, default 256MB"); - - public static final ConfigOption WRITE_BUCKET_SIZE = ConfigOptions - .key("write.bucket.size.MB") + public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions + .key("write.batch.size") .doubleType() .defaultValue(64D) // 64MB - .withDescription("Bucket size in MB to flush data into the underneath filesystem, default 64MB"); + .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); + + public static final ConfigOption WRITE_RATE_LIMIT = ConfigOptions + .key("write.rate.limit") + .longType() + .defaultValue(-1L) // default no limit + .withDescription("Write records rate limit per second to reduce risk of OOM, default -1 (no limit)"); public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions - .key("write.log_block.size.MB") + .key("write.log_block.size") .intType() .defaultValue(128) .withDescription("Max log block size in MB for log file, default 128MB"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index d316ee11d..8244226b3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -41,21 +41,18 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.function.BiFunction; -import java.util.stream.Collectors; /** * Sink function to write the data to the underneath filesystem. @@ -63,8 +60,7 @@ import java.util.stream.Collectors; *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE} - * or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE} + * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * @@ -102,11 +98,6 @@ public class StreamWriteFunction private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); - /** - * Write buffer size detector. - */ - private transient BufferSizeDetector detector; - /** * Write buffer as buckets for a checkpoint. The key is bucket ID. */ @@ -232,7 +223,6 @@ public class StreamWriteFunction // ------------------------------------------------------------------------- private void initBuffer() { - this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE)); this.buckets = new LinkedHashMap<>(); } @@ -259,49 +249,18 @@ public class StreamWriteFunction /** * Data bucket. */ - private static class DataBucket implements Comparable { + private static class DataBucket { private final List records; - private final BucketSizeTracer tracer; + private final BufferSizeDetector detector; private DataBucket(Double batchSize) { this.records = new ArrayList<>(); - this.tracer = new BucketSizeTracer(batchSize); + this.detector = new BufferSizeDetector(batchSize); } public void reset() { this.records.clear(); - this.tracer.reset(); - } - - @Override - public int compareTo(@NotNull DataBucket other) { - return Double.compare(tracer.threshold, other.tracer.threshold); - } - } - - /** - * Tool to detect if to flush out the existing bucket. - */ - private static class BucketSizeTracer { - private final double threshold; - - private long totalSize = 0L; - - BucketSizeTracer(double bucketSizeMb) { - this.threshold = bucketSizeMb * 1024 * 1024; - } - - /** - * Trace the bucket size with given record size, - * returns true if the bucket size exceeds specified threshold. - */ - boolean trace(long recordSize) { - totalSize += recordSize; - return totalSize > this.threshold; - } - - void reset() { - this.totalSize = 0L; + this.detector.reset(); } } @@ -313,13 +272,13 @@ public class StreamWriteFunction private final Random random = new Random(47); private static final int DENOMINATOR = 100; - private final double threshold; + private final double batchSizeBytes; private long lastRecordSize = -1L; private long totalSize = 0L; BufferSizeDetector(double batchSizeMb) { - this.threshold = batchSizeMb * 1024 * 1024; + this.batchSizeBytes = batchSizeMb * 1024 * 1024; } boolean detect(Object record) { @@ -327,7 +286,7 @@ public class StreamWriteFunction lastRecordSize = ObjectSizeCalculator.getObjectSize(record); } totalSize += lastRecordSize; - return totalSize > this.threshold; + return totalSize > this.batchSizeBytes; } boolean sampling() { @@ -339,10 +298,6 @@ public class StreamWriteFunction this.lastRecordSize = -1L; this.totalSize = 0L; } - - public void countDown(long bucketSize) { - this.totalSize -= bucketSize; - } } /** @@ -357,49 +312,19 @@ public class StreamWriteFunction /** * Buffers the given record. * - *

Flush the data bucket first if one of the condition meets: - * - *

    - *
  • The bucket size is greater than the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.
  • - *
  • Flush half of the data buckets if the whole buffer size - * exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}.
  • - *
+ *

Flush the data bucket first if the bucket records size is greater than + * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. * * @param value HoodieRecord */ private void bufferRecord(I value) { - boolean flushBuffer = detector.detect(value); - if (flushBuffer) { - List sortedBuckets = this.buckets.values().stream() - .filter(b -> b.records.size() > 0) - .sorted(Comparator.comparingLong(b -> b.tracer.totalSize)) - .collect(Collectors.toList()); - // flush half bytes size of buckets to avoid flushing too small buckets - // which cause small files. - long totalSize = detector.totalSize; - long flushedBytes = 0; - for (DataBucket bucket : sortedBuckets) { - final long bucketSize = bucket.tracer.totalSize; - flushBucket(bucket); - detector.countDown(bucketSize); - bucket.reset(); - - flushedBytes += bucketSize; - if (flushedBytes > detector.totalSize / 2) { - break; - } - } - LOG.info("Flush {} bytes data buckets because the total buffer size {} bytes exceeds the threshold {} bytes", - flushedBytes, totalSize, detector.threshold); - } final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE))); - boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize); - if (flushBucket) { + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); + boolean needFlush = bucket.detector.detect(value); + if (needFlush) { flushBucket(bucket); - detector.countDown(bucket.tracer.totalSize); bucket.reset(); } bucket.records.add((HoodieRecord) value); @@ -465,7 +390,6 @@ public class StreamWriteFunction .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); - this.detector.reset(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 5bd3c687e..4948462f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -44,6 +44,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Constructor; +import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Function that transforms RowData to HoodieRecord. @@ -80,6 +82,12 @@ public class RowDataToHoodieFunction 0) { + this.rateLimiter = new RateLimiter(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks()); + } } @SuppressWarnings("unchecked") @Override public O map(I i) throws Exception { - return (O) toHoodieRecord(i); + if (rateLimiter != null) { + final O hoodieRecord; + if (rateLimiter.sampling()) { + long startTime = System.currentTimeMillis(); + hoodieRecord = (O) toHoodieRecord(i); + long endTime = System.currentTimeMillis(); + rateLimiter.processTime(endTime - startTime); + } else { + hoodieRecord = (O) toHoodieRecord(i); + } + rateLimiter.sleepIfNeeded(); + return hoodieRecord; + } else { + return (O) toHoodieRecord(i); + } } /** @@ -165,4 +191,43 @@ public class RowDataToHoodieFunction 0, "rate should be positive"); + this.maxProcessTime = 1000 / rate; + } + + void processTime(long processTime) { + this.processTime = processTime; + this.timeToSleep = maxProcessTime - processTime; + } + + boolean sampling() { + // 0.01 sampling percentage + return processTime == -1 || random.nextInt(DENOMINATOR) == 1; + } + + void sleepIfNeeded() throws Exception { + if (timeToSleep > 0) { + TimeUnit.MILLISECONDS.sleep(timeToSleep); + } + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java index 86372e33b..e2f5f7b95 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java @@ -80,7 +80,7 @@ public class StreamReadOperator extends AbstractStreamOperator // them to the executor. This state is used to ensure that only one read task is in that splits queue at a time, so that // read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING. // When there are no more files to read, this will be set to IDLE. - private transient SplitState currentSplitState; + private transient volatile SplitState currentSplitState; private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService, MailboxExecutor mailboxExecutor) { @@ -141,7 +141,7 @@ public class StreamReadOperator extends AbstractStreamOperator private void enqueueProcessSplits() { if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) { currentSplitState = SplitState.RUNNING; - executor.execute(this::processSplits, this.getClass().getSimpleName()); + executor.execute(this::processSplits, "process input split"); } } @@ -155,8 +155,8 @@ public class StreamReadOperator extends AbstractStreamOperator // This log is important to indicate the consuming process, there is only one log message for one data bucket. LOG.info("Processing input split : {}", split); - format.open(split); try { + format.open(split); RowData nextElement = null; while (!format.reachedEnd()) { nextElement = format.nextRecord(nextElement); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 3d1461b41..9e417e33f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -378,7 +378,7 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BUCKET_SIZE, 0.001); // 1Kb batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -436,68 +436,6 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } - @Test - public void testInsertWithSmallBuffer() throws Exception { - // reset the config option - conf.setDouble(FlinkOptions.WRITE_BUFFER_SIZE, 0.001); // 1Kb buffer size - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // each record is 424 bytes. so 3 records expect to trigger buffer flush: - // flush half of the buckets once at a time. - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("4 records expect to flush out as a mini-batch", - dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(1)); - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - for (int i = 0; i < 3; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); - - funcWrapper.checkpointComplete(1); - - Map expected = getMiniBatchExpected(); - checkWrittenData(tempFile, expected, 1); - - // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - for (int i = 0; i < 3; i++) { - final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - funcWrapper.getCoordinator().handleEventFromOperator(0, event); - } - - funcWrapper.checkpointComplete(2); - - // Same the original base file content. - checkWrittenData(tempFile, expected, 1); - } - Map getMiniBatchExpected() { Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java b/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java new file mode 100644 index 000000000..631b53345 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.transform; + +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.MockStreamingRuntimeContext; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link RowDataToHoodieFunction}. + */ +public class TestRowDataToHoodieFunction { + @TempDir + File tempFile; + + private Configuration conf; + + @BeforeEach + public void before() { + final String basePath = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(basePath); + } + + @Test + void testRateLimit() throws Exception { + // at most 100 record per second + RowDataToHoodieFunction func1 = getFunc(100); + long instant1 = System.currentTimeMillis(); + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + func1.map(rowData); + } + long instant2 = System.currentTimeMillis(); + long processTime1 = instant2 - instant1; + + // at most 1 record per second + RowDataToHoodieFunction func2 = getFunc(1); + long instant3 = System.currentTimeMillis(); + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + func2.map(rowData); + } + long instant4 = System.currentTimeMillis(); + long processTime2 = instant4 - instant3; + + assertTrue(processTime2 > processTime1, "lower rate should have longer process time"); + assertTrue(processTime2 > 5000, "should process at least 5 seconds"); + } + + private RowDataToHoodieFunction getFunc(long rate) throws Exception { + conf.setLong(FlinkOptions.WRITE_RATE_LIMIT, rate); + RowDataToHoodieFunction func = + new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); + func.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1)); + func.open(conf); + return func; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 2f2dcb26b..fe652c5ff 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -330,7 +330,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.WRITE_BUCKET_SIZE.key(), "0.001"); + options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL);