diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3a942affb..de9b950f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -267,11 +267,17 @@ public class FlinkOptions { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); - public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions - .key("write.batch.size.MB") + public static final ConfigOption WRITE_BUFFER_SIZE = ConfigOptions + .key("write.buffer.size.MB") + .doubleType() + .defaultValue(256D) // 256MB + .withDescription("Total buffer size in MB to flush data into the underneath filesystem, default 256MB"); + + public static final ConfigOption WRITE_BUCKET_SIZE = ConfigOptions + .key("write.bucket.size.MB") .doubleType() .defaultValue(64D) // 64MB - .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); + .withDescription("Bucket size in MB to flush data into the underneath filesystem, default 64MB"); public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions .key("write.log_block.size.MB") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8244226b3..36bd0edae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -41,18 +41,21 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Sink function to write the data to the underneath filesystem. @@ -60,7 +63,7 @@ import java.util.function.BiFunction; *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BUCKET_SIZE} * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * @@ -98,6 +101,11 @@ public class StreamWriteFunction private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); + /** + * Write buffer size detector. + */ + private transient BufferSizeDetector detector; + /** * Write buffer as buckets for a checkpoint. The key is bucket ID. */ @@ -223,6 +231,7 @@ public class StreamWriteFunction // ------------------------------------------------------------------------- private void initBuffer() { + this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE)); this.buckets = new LinkedHashMap<>(); } @@ -249,18 +258,49 @@ public class StreamWriteFunction /** * Data bucket. */ - private static class DataBucket { + private static class DataBucket implements Comparable { private final List records; - private final BufferSizeDetector detector; + private final BucketSizeTracer tracer; private DataBucket(Double batchSize) { this.records = new ArrayList<>(); - this.detector = new BufferSizeDetector(batchSize); + this.tracer = new BucketSizeTracer(batchSize); } public void reset() { this.records.clear(); - this.detector.reset(); + this.tracer.reset(); + } + + @Override + public int compareTo(@NotNull DataBucket other) { + return Double.compare(tracer.threshold, other.tracer.threshold); + } + } + + /** + * Tool to detect if to flush out the existing bucket. + */ + private static class BucketSizeTracer { + private final double threshold; + + private long totalSize = 0L; + + BucketSizeTracer(double bucketSizeMb) { + this.threshold = bucketSizeMb * 1024 * 1024; + } + + /** + * Trace the bucket size with given record size, + * returns true if the bucket size exceeds specified threshold. + */ + boolean trace(long recordSize) { + totalSize += recordSize; + return totalSize > this.threshold; + } + + void reset() { + this.totalSize = 0L; } } @@ -272,13 +312,13 @@ public class StreamWriteFunction private final Random random = new Random(47); private static final int DENOMINATOR = 100; - private final double batchSizeBytes; + private final double threshold; private long lastRecordSize = -1L; private long totalSize = 0L; BufferSizeDetector(double batchSizeMb) { - this.batchSizeBytes = batchSizeMb * 1024 * 1024; + this.threshold = batchSizeMb * 1024 * 1024; } boolean detect(Object record) { @@ -286,7 +326,7 @@ public class StreamWriteFunction lastRecordSize = ObjectSizeCalculator.getObjectSize(record); } totalSize += lastRecordSize; - return totalSize > this.batchSizeBytes; + return totalSize > this.threshold; } boolean sampling() { @@ -298,6 +338,10 @@ public class StreamWriteFunction this.lastRecordSize = -1L; this.totalSize = 0L; } + + public void countDown(long bucketSize) { + this.totalSize -= bucketSize; + } } /** @@ -313,17 +357,34 @@ public class StreamWriteFunction * Buffers the given record. * *

Flush the data bucket first if the bucket records size is greater than - * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}. * * @param value HoodieRecord */ private void bufferRecord(I value) { + boolean flushBuffer = detector.detect(value); + if (flushBuffer) { + List sortedBuckets = this.buckets.values().stream() + .sorted(Comparator.comparingDouble(b -> b.tracer.totalSize)) + .collect(Collectors.toList()); + // flush half number of buckets to avoid flushing too small buckets + // which cause small files. + int numBucketsToFlush = (sortedBuckets.size() + 1) / 2; + LOG.info("Flush {} data buckets because the total buffer size [{} bytes] exceeds the threshold [{} bytes]", + numBucketsToFlush, detector.totalSize, detector.threshold); + for (int i = 0; i < numBucketsToFlush; i++) { + DataBucket bucket = sortedBuckets.get(i); + flushBucket(bucket); + detector.countDown(bucket.tracer.totalSize); + bucket.reset(); + } + } final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); - boolean needFlush = bucket.detector.detect(value); - if (needFlush) { + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE))); + boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize); + if (flushBucket) { flushBucket(bucket); bucket.reset(); } @@ -390,6 +451,7 @@ public class StreamWriteFunction .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); + this.detector.reset(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 9e417e33f..3d1461b41 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -378,7 +378,7 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size + conf.setDouble(FlinkOptions.WRITE_BUCKET_SIZE, 0.001); // 1Kb batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -436,6 +436,68 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } + @Test + public void testInsertWithSmallBuffer() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BUFFER_SIZE, 0.001); // 1Kb buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // each record is 424 bytes. so 3 records expect to trigger buffer flush: + // flush half of the buckets once at a time. + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("4 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(1)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + for (int i = 0; i < 3; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map expected = getMiniBatchExpected(); + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + for (int i = 0; i < 3; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + Map getMiniBatchExpected() { Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index fe652c5ff..2f2dcb26b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -330,7 +330,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); - options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); + options.put(FlinkOptions.WRITE_BUCKET_SIZE.key(), "0.001"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL);