[HUDI-1886] Avoid to generates corrupted files for flink sink (#2929)
This commit is contained in:
@@ -55,6 +55,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
|
private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
|
||||||
private long lastFileSize = 0L;
|
private long lastFileSize = 0L;
|
||||||
|
private long totalRecordsWritten = 0L;
|
||||||
|
|
||||||
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
|
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
|
||||||
@@ -143,6 +144,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
try {
|
try {
|
||||||
setupWriteStatus();
|
setupWriteStatus();
|
||||||
// reset the write status
|
// reset the write status
|
||||||
|
totalRecordsWritten += recordsWritten;
|
||||||
recordsWritten = 0;
|
recordsWritten = 0;
|
||||||
recordsDeleted = 0;
|
recordsDeleted = 0;
|
||||||
insertRecordsWritten = 0;
|
insertRecordsWritten = 0;
|
||||||
@@ -169,7 +171,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finishWrite() {
|
public void finishWrite() {
|
||||||
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
|
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten);
|
||||||
try {
|
try {
|
||||||
fileWriter.close();
|
fileWriter.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|||||||
@@ -142,6 +142,22 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
*/
|
*/
|
||||||
private transient TotalSizeTracer tracer;
|
private transient TotalSizeTracer tracer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag saying whether the write task is waiting for the checkpoint success notification
|
||||||
|
* after it finished a checkpoint.
|
||||||
|
*
|
||||||
|
* <p>The flag is needed because the write task does not block during the waiting time interval,
|
||||||
|
* some data buckets still flush out with old instant time. There are two cases that the flush may produce
|
||||||
|
* corrupted files if the old instant is committed successfully:
|
||||||
|
* 1) the write handle was writing data but interrupted, left a corrupted parquet file;
|
||||||
|
* 2) the write handle finished the write but was not closed, left an empty parquet file.
|
||||||
|
*
|
||||||
|
* <p>To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms,
|
||||||
|
* the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because
|
||||||
|
* it always filters the data files based on successful commit time.
|
||||||
|
*/
|
||||||
|
private volatile boolean confirming = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a StreamingSinkFunction.
|
* Constructs a StreamingSinkFunction.
|
||||||
*
|
*
|
||||||
@@ -191,7 +207,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void notifyCheckpointComplete(long checkpointId) {
|
public void notifyCheckpointComplete(long checkpointId) {
|
||||||
this.writeClient.cleanHandles();
|
this.confirming = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -392,21 +408,27 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked, rawtypes")
|
@SuppressWarnings("unchecked, rawtypes")
|
||||||
private void flushBucket(DataBucket bucket) {
|
private void flushBucket(DataBucket bucket) {
|
||||||
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
|
final String instant = this.writeClient.getLastPendingInstant(this.actionType);
|
||||||
if (this.currentInstant == null) {
|
|
||||||
|
if (instant == null) {
|
||||||
// in case there are empty checkpoints that has no input data
|
// in case there are empty checkpoints that has no input data
|
||||||
LOG.info("No inflight instant when flushing data, cancel.");
|
LOG.info("No inflight instant when flushing data, cancel.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we are waiting for the checkpoint notification, shift the write instant time.
|
||||||
|
boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant);
|
||||||
|
final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant;
|
||||||
|
|
||||||
List<HoodieRecord> records = bucket.records;
|
List<HoodieRecord> records = bucket.records;
|
||||||
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
|
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
|
||||||
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
||||||
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
||||||
}
|
}
|
||||||
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, currentInstant));
|
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant));
|
||||||
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
|
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
|
||||||
.taskID(taskID)
|
.taskID(taskID)
|
||||||
.instantTime(currentInstant)
|
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
|
||||||
.writeStatus(writeStatus)
|
.writeStatus(writeStatus)
|
||||||
.isLastBatch(false)
|
.isLastBatch(false)
|
||||||
.isEndInput(false)
|
.isEndInput(false)
|
||||||
@@ -451,6 +473,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
this.eventGateway.sendEventToCoordinator(event);
|
this.eventGateway.sendEventToCoordinator(event);
|
||||||
this.buckets.clear();
|
this.buckets.clear();
|
||||||
this.tracer.reset();
|
this.tracer.reset();
|
||||||
this.currentInstant = "";
|
this.writeClient.cleanHandles();
|
||||||
|
this.confirming = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,6 +58,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
@@ -324,4 +326,19 @@ public class StreamerUtil {
|
|||||||
|
|
||||||
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
|
return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Plus the old instant time with given milliseconds and returns.
|
||||||
|
*/
|
||||||
|
public static String instantTimePlus(String oldInstant, long milliseconds) {
|
||||||
|
long oldTime = Long.parseLong(oldInstant);
|
||||||
|
return String.valueOf(oldTime + milliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copied from Objects#equal.
|
||||||
|
*/
|
||||||
|
public static boolean equal(@Nullable Object a, @Nullable Object b) {
|
||||||
|
return a == b || (a != null && a.equals(b));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||||
coordinator.notifyCheckpointComplete(checkpointId);
|
coordinator.notifyCheckpointComplete(checkpointId);
|
||||||
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
||||||
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
|
||||||
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
|
||||||
try {
|
try {
|
||||||
compactFunctionWrapper.compact(checkpointId);
|
compactFunctionWrapper.compact(checkpointId);
|
||||||
|
|||||||
Reference in New Issue
Block a user