[HUDI-1911] Reuse the partition path and file group id for flink write data buffer (#2961)
Reuse to reduce memory footprint.
This commit is contained in:
@@ -89,7 +89,7 @@ public class FlinkOptions {
|
|||||||
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
|
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
|
||||||
.key("index.global.enabled")
|
.key("index.global.enabled")
|
||||||
.booleanType()
|
.booleanType()
|
||||||
.defaultValue(true)
|
.defaultValue(false)
|
||||||
.withDescription("Whether to update index for the old partition path\n"
|
.withDescription("Whether to update index for the old partition path\n"
|
||||||
+ "if same key record with different partition path came in, default true");
|
+ "if same key record with different partition path came in, default true");
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,10 @@ package org.apache.hudi.sink;
|
|||||||
|
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.util.CommitUtils;
|
import org.apache.hudi.common.util.CommitUtils;
|
||||||
@@ -178,7 +181,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
|
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
|
||||||
bufferRecord(value);
|
bufferRecord((HoodieRecord<?>) value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -211,7 +214,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
public Map<String, List<HoodieRecord>> getDataBuffer() {
|
public Map<String, List<HoodieRecord>> getDataBuffer() {
|
||||||
Map<String, List<HoodieRecord>> ret = new HashMap<>();
|
Map<String, List<HoodieRecord>> ret = new HashMap<>();
|
||||||
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
|
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
|
||||||
ret.put(entry.getKey(), entry.getValue().records);
|
ret.put(entry.getKey(), entry.getValue().writeBuffer());
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@@ -254,16 +257,77 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a data item in the buffer, this is needed to reduce the
|
||||||
|
* memory footprint.
|
||||||
|
*
|
||||||
|
* <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem}
|
||||||
|
* for buffering, it then transforms back to the {@link HoodieRecord} before flushing.
|
||||||
|
*/
|
||||||
|
private static class DataItem {
|
||||||
|
private final String key; // record key
|
||||||
|
private final String instant; // 'U' or 'I'
|
||||||
|
private final HoodieRecordPayload<?> data; // record payload
|
||||||
|
|
||||||
|
private DataItem(String key, String instant, HoodieRecordPayload<?> data) {
|
||||||
|
this.key = key;
|
||||||
|
this.instant = instant;
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
|
||||||
|
return new DataItem(
|
||||||
|
record.getRecordKey(),
|
||||||
|
record.getCurrentLocation().getInstantTime(),
|
||||||
|
record.getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieRecord<?> toHoodieRecord(String partitionPath) {
|
||||||
|
HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
|
||||||
|
HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data);
|
||||||
|
HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
|
||||||
|
record.setCurrentLocation(loc);
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Data bucket.
|
* Data bucket.
|
||||||
*/
|
*/
|
||||||
private static class DataBucket {
|
private static class DataBucket {
|
||||||
private final List<HoodieRecord> records;
|
private final List<DataItem> records;
|
||||||
private final BufferSizeDetector detector;
|
private final BufferSizeDetector detector;
|
||||||
|
private final String partitionPath;
|
||||||
|
private final String fileID;
|
||||||
|
|
||||||
private DataBucket(Double batchSize) {
|
private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
|
||||||
this.records = new ArrayList<>();
|
this.records = new ArrayList<>();
|
||||||
this.detector = new BufferSizeDetector(batchSize);
|
this.detector = new BufferSizeDetector(batchSize);
|
||||||
|
this.partitionPath = hoodieRecord.getPartitionPath();
|
||||||
|
this.fileID = hoodieRecord.getCurrentLocation().getFileId();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the write data buffer:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>Patch up all the records with correct partition path;</li>
|
||||||
|
* <li>Patch up the first record with correct partition path and fileID.</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public List<HoodieRecord> writeBuffer() {
|
||||||
|
// rewrite all the records with new record key
|
||||||
|
List<HoodieRecord> recordList = records.stream()
|
||||||
|
.map(record -> record.toHoodieRecord(partitionPath))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
// rewrite the first record with expected fileID
|
||||||
|
HoodieRecord<?> first = recordList.get(0);
|
||||||
|
HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData());
|
||||||
|
HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
|
||||||
|
record.setCurrentLocation(newLoc);
|
||||||
|
|
||||||
|
recordList.set(0, record);
|
||||||
|
return recordList;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void reset() {
|
public void reset() {
|
||||||
@@ -349,8 +413,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
/**
|
/**
|
||||||
* Returns the bucket ID with the given value {@code value}.
|
* Returns the bucket ID with the given value {@code value}.
|
||||||
*/
|
*/
|
||||||
private String getBucketID(I value) {
|
private String getBucketID(HoodieRecord<?> record) {
|
||||||
HoodieRecord<?> record = (HoodieRecord<?>) value;
|
|
||||||
final String fileId = record.getCurrentLocation().getFileId();
|
final String fileId = record.getCurrentLocation().getFileId();
|
||||||
return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
|
return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
|
||||||
}
|
}
|
||||||
@@ -366,12 +429,13 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
*
|
*
|
||||||
* @param value HoodieRecord
|
* @param value HoodieRecord
|
||||||
*/
|
*/
|
||||||
private void bufferRecord(I value) {
|
private void bufferRecord(HoodieRecord<?> value) {
|
||||||
final String bucketID = getBucketID(value);
|
final String bucketID = getBucketID(value);
|
||||||
|
|
||||||
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
|
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
|
||||||
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
|
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
|
||||||
boolean flushBucket = bucket.detector.detect(value);
|
final DataItem item = DataItem.fromHoodieRecord(value);
|
||||||
|
boolean flushBucket = bucket.detector.detect(item);
|
||||||
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
|
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
|
||||||
if (flushBucket) {
|
if (flushBucket) {
|
||||||
flushBucket(bucket);
|
flushBucket(bucket);
|
||||||
@@ -387,7 +451,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
this.tracer.countDown(bucketToFlush.detector.totalSize);
|
this.tracer.countDown(bucketToFlush.detector.totalSize);
|
||||||
bucketToFlush.reset();
|
bucketToFlush.reset();
|
||||||
}
|
}
|
||||||
bucket.records.add((HoodieRecord<?>) value);
|
bucket.records.add(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked, rawtypes")
|
@SuppressWarnings("unchecked, rawtypes")
|
||||||
@@ -400,7 +464,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<HoodieRecord> records = bucket.records;
|
List<HoodieRecord> records = bucket.writeBuffer();
|
||||||
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
|
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
|
||||||
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
||||||
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
||||||
@@ -431,12 +495,13 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
// The records are partitioned by the bucket ID and each batch sent to
|
// The records are partitioned by the bucket ID and each batch sent to
|
||||||
// the writer belongs to one bucket.
|
// the writer belongs to one bucket.
|
||||||
.forEach(bucket -> {
|
.forEach(bucket -> {
|
||||||
List<HoodieRecord> records = bucket.records;
|
List<HoodieRecord> records = bucket.writeBuffer();
|
||||||
if (records.size() > 0) {
|
if (records.size() > 0) {
|
||||||
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
||||||
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
||||||
}
|
}
|
||||||
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
||||||
|
bucket.reset();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -219,9 +219,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
updateIndexState(recordKey, partitionPath, location);
|
updateIndexState(recordKey, partitionPath, location);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
record.unseal();
|
|
||||||
record.setCurrentLocation(location);
|
record.setCurrentLocation(location);
|
||||||
record.seal();
|
|
||||||
out.collect((O) record);
|
out.collect((O) record);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -378,21 +378,21 @@ public class TestWriteCopyOnWrite {
|
|||||||
@Test
|
@Test
|
||||||
public void testInsertWithMiniBatches() throws Exception {
|
public void testInsertWithMiniBatches() throws Exception {
|
||||||
// reset the config option
|
// reset the config option
|
||||||
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
|
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
|
||||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||||
|
|
||||||
// open the function and ingest data
|
// open the function and ingest data
|
||||||
funcWrapper.openFunction();
|
funcWrapper.openFunction();
|
||||||
// Each record is 424 bytes. so 3 records expect to trigger a mini-batch write
|
// Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
|
||||||
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
||||||
funcWrapper.invoke(rowData);
|
funcWrapper.invoke(rowData);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
||||||
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
||||||
assertThat("2 records expect to flush out as a mini-batch",
|
assertThat("3 records expect to flush out as a mini-batch",
|
||||||
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
||||||
is(3));
|
is(2));
|
||||||
|
|
||||||
// this triggers the data write and event send
|
// this triggers the data write and event send
|
||||||
funcWrapper.checkpointFunction(1);
|
funcWrapper.checkpointFunction(1);
|
||||||
@@ -439,12 +439,12 @@ public class TestWriteCopyOnWrite {
|
|||||||
@Test
|
@Test
|
||||||
public void testInsertWithSmallBufferSize() throws Exception {
|
public void testInsertWithSmallBufferSize() throws Exception {
|
||||||
// reset the config option
|
// reset the config option
|
||||||
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.001); // 1Kb buffer size
|
conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
|
||||||
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||||
|
|
||||||
// open the function and ingest data
|
// open the function and ingest data
|
||||||
funcWrapper.openFunction();
|
funcWrapper.openFunction();
|
||||||
// each record is 424 bytes. so 3 records expect to trigger buffer flush:
|
// each record is 208 bytes. so 4 records expect to trigger buffer flush:
|
||||||
// flush the max size bucket once at a time.
|
// flush the max size bucket once at a time.
|
||||||
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
|
||||||
funcWrapper.invoke(rowData);
|
funcWrapper.invoke(rowData);
|
||||||
@@ -452,9 +452,9 @@ public class TestWriteCopyOnWrite {
|
|||||||
|
|
||||||
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
|
||||||
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
|
||||||
assertThat("2 records expect to flush out as a mini-batch",
|
assertThat("3 records expect to flush out as a mini-batch",
|
||||||
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
|
||||||
is(3));
|
is(2));
|
||||||
|
|
||||||
// this triggers the data write and event send
|
// this triggers the data write and event send
|
||||||
funcWrapper.checkpointFunction(1);
|
funcWrapper.checkpointFunction(1);
|
||||||
@@ -500,8 +500,9 @@ public class TestWriteCopyOnWrite {
|
|||||||
|
|
||||||
Map<String, String> getMiniBatchExpected() {
|
Map<String, String> getMiniBatchExpected() {
|
||||||
Map<String, String> expected = new HashMap<>();
|
Map<String, String> expected = new HashMap<>();
|
||||||
// the last 3 lines are merged
|
// the last 2 lines are merged
|
||||||
expected.put("par1", "["
|
expected.put("par1", "["
|
||||||
|
+ "id1,par1,id1,Danny,23,1,par1, "
|
||||||
+ "id1,par1,id1,Danny,23,1,par1, "
|
+ "id1,par1,id1,Danny,23,1,par1, "
|
||||||
+ "id1,par1,id1,Danny,23,1,par1]");
|
+ "id1,par1,id1,Danny,23,1,par1]");
|
||||||
return expected;
|
return expected;
|
||||||
|
|||||||
@@ -401,6 +401,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
|
|
||||||
Map<String, String> options = new HashMap<>();
|
Map<String, String> options = new HashMap<>();
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
|
options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "true");
|
||||||
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
|
options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
streamTableEnv.executeSql(hoodieTableDDL);
|
streamTableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|||||||
Reference in New Issue
Block a user