1
0

[HUDI-1705] Flush as per data bucket for mini-batch write (#2695)

Detects the buffer size for each data bucket before flushing. So that we
avoid flushing data buckets with few records.
This commit is contained in:
Danny Chan
2021-03-19 16:30:54 +08:00
committed by GitHub
parent 1277c62398
commit f74828fca1
4 changed files with 88 additions and 47 deletions

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -100,9 +102,9 @@ public class StreamWriteFunction<K, I, O>
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
/** /**
* Write buffer for a checkpoint. * Write buffer as buckets for a checkpoint. The key is bucket ID.
*/ */
private transient Map<String, List<HoodieRecord>> buffer; private transient Map<String, DataBucket> buckets;
/** /**
* The buffer lock to control data buffering/flushing. * The buffer lock to control data buffering/flushing.
@@ -146,11 +148,6 @@ public class StreamWriteFunction<K, I, O>
*/ */
private transient OperatorEventGateway eventGateway; private transient OperatorEventGateway eventGateway;
/**
* The detector that tells if to flush the data as mini-batch.
*/
private transient BufferSizeDetector detector;
/** /**
* Constructs a StreamingSinkFunction. * Constructs a StreamingSinkFunction.
* *
@@ -163,7 +160,6 @@ public class StreamWriteFunction<K, I, O>
@Override @Override
public void open(Configuration parameters) throws IOException { public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
initBuffer(); initBuffer();
initWriteClient(); initWriteClient();
initWriteFunction(); initWriteFunction();
@@ -182,7 +178,7 @@ public class StreamWriteFunction<K, I, O>
// it would check the validity. // it would check the validity.
this.onCheckpointing = true; this.onCheckpointing = true;
// wait for the buffer data flush out and request a new instant // wait for the buffer data flush out and request a new instant
flushBuffer(true, false); flushRemaining(false);
// signal the task thread to start buffering // signal the task thread to start buffering
addToBufferCondition.signal(); addToBufferCondition.signal();
} finally { } finally {
@@ -198,8 +194,7 @@ public class StreamWriteFunction<K, I, O>
if (onCheckpointing) { if (onCheckpointing) {
addToBufferCondition.await(); addToBufferCondition.await();
} }
flushBufferOnCondition(value); bufferRecord(value);
putDataIntoBuffer(value);
} finally { } finally {
bufferLock.unlock(); bufferLock.unlock();
} }
@@ -221,7 +216,7 @@ public class StreamWriteFunction<K, I, O>
* End input action for batch source. * End input action for batch source.
*/ */
public void endInput() { public void endInput() {
flushBuffer(true, true); flushRemaining(true);
this.writeClient.cleanHandles(); this.writeClient.cleanHandles();
} }
@@ -231,8 +226,12 @@ public class StreamWriteFunction<K, I, O>
@VisibleForTesting @VisibleForTesting
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public Map<String, List<HoodieRecord>> getBuffer() { public Map<String, List<HoodieRecord>> getDataBuffer() {
return buffer; Map<String, List<HoodieRecord>> ret = new HashMap<>();
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
ret.put(entry.getKey(), entry.getValue().records);
}
return ret;
} }
@VisibleForTesting @VisibleForTesting
@@ -250,7 +249,7 @@ public class StreamWriteFunction<K, I, O>
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private void initBuffer() { private void initBuffer() {
this.buffer = new LinkedHashMap<>(); this.buckets = new LinkedHashMap<>();
this.bufferLock = new ReentrantLock(); this.bufferLock = new ReentrantLock();
this.addToBufferCondition = this.bufferLock.newCondition(); this.addToBufferCondition = this.bufferLock.newCondition();
} }
@@ -278,6 +277,24 @@ public class StreamWriteFunction<K, I, O>
} }
} }
/**
* Data bucket.
*/
private static class DataBucket {
private final List<HoodieRecord> records;
private final BufferSizeDetector detector;
private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
this.detector = new BufferSizeDetector(batchSize);
}
public void reset() {
this.records.clear();
this.detector.reset();
}
}
/** /**
* Tool to detect if to flush out the existing buffer. * Tool to detect if to flush out the existing buffer.
* Sampling the record to compute the size with 0.01 percentage. * Sampling the record to compute the size with 0.01 percentage.
@@ -314,32 +331,62 @@ public class StreamWriteFunction<K, I, O>
} }
} }
private void putDataIntoBuffer(I value) { /**
* Returns the bucket ID with the given value {@code value}.
*/
private String getBucketID(I value) {
HoodieRecord<?> record = (HoodieRecord<?>) value; HoodieRecord<?> record = (HoodieRecord<?>) value;
final String fileId = record.getCurrentLocation().getFileId(); final String fileId = record.getCurrentLocation().getFileId();
final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
if (!this.buffer.containsKey(key)) {
this.buffer.put(key, new ArrayList<>());
}
this.buffer.get(key).add(record);
} }
/** /**
* Flush the data buffer if the buffer size is greater than * Buffers the given record.
*
* <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
* *
* @param value HoodieRecord * @param value HoodieRecord
*/ */
private void flushBufferOnCondition(I value) { private void bufferRecord(I value) {
boolean needFlush = this.detector.detect(value); final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
boolean needFlush = bucket.detector.detect(value);
if (needFlush) { if (needFlush) {
flushBuffer(false, false); flushBucket(bucket);
this.detector.reset(); bucket.reset();
} }
bucket.records.add((HoodieRecord<?>) value);
} }
@SuppressWarnings("unchecked, rawtypes") @SuppressWarnings("unchecked, rawtypes")
private void flushBuffer(boolean isLastBatch, boolean isEndInput) { private void flushBucket(DataBucket bucket) {
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
return;
}
List<HoodieRecord> records = bucket.records;
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, currentInstant));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(false)
.isEndInput(false)
.build();
this.eventGateway.sendEventToCoordinator(event);
}
@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) { if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data // in case there are empty checkpoints that has no input data
@@ -347,12 +394,13 @@ public class StreamWriteFunction<K, I, O>
return; return;
} }
final List<WriteStatus> writeStatus; final List<WriteStatus> writeStatus;
if (buffer.size() > 0) { if (buckets.size() > 0) {
writeStatus = new ArrayList<>(); writeStatus = new ArrayList<>();
this.buffer.values() this.buckets.values()
// The records are partitioned by the bucket ID and each batch sent to // The records are partitioned by the bucket ID and each batch sent to
// the writer belongs to one bucket. // the writer belongs to one bucket.
.forEach(records -> { .forEach(bucket -> {
List<HoodieRecord> records = bucket.records;
if (records.size() > 0) { if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
@@ -368,11 +416,11 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID) .taskID(taskID)
.instantTime(currentInstant) .instantTime(currentInstant)
.writeStatus(writeStatus) .writeStatus(writeStatus)
.isLastBatch(isLastBatch) .isLastBatch(true)
.isEndInput(isEndInput) .isEndInput(isEndInput)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
this.buffer.clear(); this.buckets.clear();
this.currentInstant = ""; this.currentInstant = "";
} }
} }

View File

@@ -399,6 +399,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send // this triggers the data write and event send
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0)); assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
@@ -430,10 +431,8 @@ public class TestWriteCopyOnWrite {
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event4 = funcWrapper.getNextEvent(); final OperatorEvent event4 = funcWrapper.getNextEvent();
final OperatorEvent event5 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3); funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4); funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
funcWrapper.checkpointComplete(2); funcWrapper.checkpointComplete(2);
// Same the original base file content. // Same the original base file content.

View File

@@ -137,7 +137,7 @@ public class StreamWriteFunctionWrapper<I> {
} }
public Map<String, List<HoodieRecord>> getDataBuffer() { public Map<String, List<HoodieRecord>> getDataBuffer() {
return this.writeFunction.getBuffer(); return this.writeFunction.getDataBuffer();
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")

View File

@@ -33,11 +33,11 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.io.BufferedReader; import java.io.File;
import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -151,15 +151,9 @@ public class ContinuousFileSource implements StreamTableSource<RowData> {
} }
private void loadDataBuffer() { private void loadDataBuffer() {
this.dataBuffer = new ArrayList<>(); try {
try (BufferedReader reader = new File(this.path.toString()).exists();
new BufferedReader(new FileReader(this.path.toString()))) { this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
String line = reader.readLine();
while (line != null) {
this.dataBuffer.add(line);
// read next line
line = reader.readLine();
}
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Read file " + this.path + " error", e); throw new RuntimeException("Read file " + this.path + " error", e);
} }