diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 33a16c0dd..3f1b17bc0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -89,7 +89,7 @@ public class FlinkOptions { public static final ConfigOption INDEX_GLOBAL_ENABLED = ConfigOptions .key("index.global.enabled") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription("Whether to update index for the old partition path\n" + "if same key record with different partition path came in, default true"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index e35419adc..2a38f310b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -20,7 +20,10 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.CommitUtils; @@ -178,7 +181,7 @@ public class StreamWriteFunction @Override public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) { - bufferRecord(value); + bufferRecord((HoodieRecord) value); } @Override @@ -211,7 +214,7 @@ public class StreamWriteFunction public Map> getDataBuffer() { Map> ret = new HashMap<>(); for (Map.Entry entry : buckets.entrySet()) { - ret.put(entry.getKey(), entry.getValue().records); + ret.put(entry.getKey(), entry.getValue().writeBuffer()); } return ret; } @@ -254,16 +257,77 @@ public class StreamWriteFunction } } + /** + * Represents a data item in the buffer, this is needed to reduce the + * memory footprint. + * + *

A {@link HoodieRecord} was firstly transformed into a {@link DataItem} + * for buffering, it then transforms back to the {@link HoodieRecord} before flushing. + */ + private static class DataItem { + private final String key; // record key + private final String instant; // 'U' or 'I' + private final HoodieRecordPayload data; // record payload + + private DataItem(String key, String instant, HoodieRecordPayload data) { + this.key = key; + this.instant = instant; + this.data = data; + } + + public static DataItem fromHoodieRecord(HoodieRecord record) { + return new DataItem( + record.getRecordKey(), + record.getCurrentLocation().getInstantTime(), + record.getData()); + } + + public HoodieRecord toHoodieRecord(String partitionPath) { + HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath); + HoodieRecord record = new HoodieRecord<>(hoodieKey, data); + HoodieRecordLocation loc = new HoodieRecordLocation(instant, null); + record.setCurrentLocation(loc); + return record; + } + } + /** * Data bucket. */ private static class DataBucket { - private final List records; + private final List records; private final BufferSizeDetector detector; + private final String partitionPath; + private final String fileID; - private DataBucket(Double batchSize) { + private DataBucket(Double batchSize, HoodieRecord hoodieRecord) { this.records = new ArrayList<>(); this.detector = new BufferSizeDetector(batchSize); + this.partitionPath = hoodieRecord.getPartitionPath(); + this.fileID = hoodieRecord.getCurrentLocation().getFileId(); + } + + /** + * Prepare the write data buffer: + * + *

    + *
  • Patch up all the records with correct partition path;
  • + *
  • Patch up the first record with correct partition path and fileID.
  • + *
+ */ + public List writeBuffer() { + // rewrite all the records with new record key + List recordList = records.stream() + .map(record -> record.toHoodieRecord(partitionPath)) + .collect(Collectors.toList()); + // rewrite the first record with expected fileID + HoodieRecord first = recordList.get(0); + HoodieRecord record = new HoodieRecord<>(first.getKey(), first.getData()); + HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); + record.setCurrentLocation(newLoc); + + recordList.set(0, record); + return recordList; } public void reset() { @@ -349,8 +413,7 @@ public class StreamWriteFunction /** * Returns the bucket ID with the given value {@code value}. */ - private String getBucketID(I value) { - HoodieRecord record = (HoodieRecord) value; + private String getBucketID(HoodieRecord record) { final String fileId = record.getCurrentLocation().getFileId(); return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); } @@ -366,12 +429,13 @@ public class StreamWriteFunction * * @param value HoodieRecord */ - private void bufferRecord(I value) { + private void bufferRecord(HoodieRecord value) { final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); - boolean flushBucket = bucket.detector.detect(value); + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); + final DataItem item = DataItem.fromHoodieRecord(value); + boolean flushBucket = bucket.detector.detect(item); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { flushBucket(bucket); @@ -387,7 +451,7 @@ public class StreamWriteFunction this.tracer.countDown(bucketToFlush.detector.totalSize); bucketToFlush.reset(); } - bucket.records.add((HoodieRecord) value); + bucket.records.add(item); } @SuppressWarnings("unchecked, rawtypes") @@ -400,7 +464,7 @@ public class StreamWriteFunction return; } - List records = bucket.records; + List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); @@ -431,12 +495,13 @@ public class StreamWriteFunction // The records are partitioned by the bucket ID and each batch sent to // the writer belongs to one bucket. .forEach(bucket -> { - List records = bucket.records; + List records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } writeStatus.addAll(writeFunction.apply(records, currentInstant)); + bucket.reset(); } }); } else { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 5ad20d5be..4c51275a5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -219,9 +219,7 @@ public class BucketAssignFunction> updateIndexState(recordKey, partitionPath, location); } } - record.unseal(); record.setCurrentLocation(location); - record.seal(); out.collect((O) record); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 21fbc5aae..b962afb57 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -378,21 +378,21 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithMiniBatches() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); } Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", + assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(3)); + is(2)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -439,12 +439,12 @@ public class TestWriteCopyOnWrite { @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option - conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.001); // 1Kb buffer size + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data funcWrapper.openFunction(); - // each record is 424 bytes. so 3 records expect to trigger buffer flush: + // each record is 208 bytes. so 4 records expect to trigger buffer flush: // flush the max size bucket once at a time. for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { funcWrapper.invoke(rowData); @@ -452,9 +452,9 @@ public class TestWriteCopyOnWrite { Map> dataBuffer = funcWrapper.getDataBuffer(); assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); - assertThat("2 records expect to flush out as a mini-batch", + assertThat("3 records expect to flush out as a mini-batch", dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), - is(3)); + is(2)); // this triggers the data write and event send funcWrapper.checkpointFunction(1); @@ -500,8 +500,9 @@ public class TestWriteCopyOnWrite { Map getMiniBatchExpected() { Map expected = new HashMap<>(); - // the last 3 lines are merged + // the last 2 lines are merged expected.put("par1", "[" + + "id1,par1,id1,Danny,23,1,par1, " + "id1,par1,id1,Danny,23,1,par1, " + "id1,par1,id1,Danny,23,1,par1]"); return expected; diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index af8707034..bb80cf156 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -401,6 +401,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "true"); options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); streamTableEnv.executeSql(hoodieTableDDL);