1
0

[HUDI-1895] Close the file handles gracefully for flink write function to avoid corrupted files (#2938)

This commit is contained in:
Danny Chan
2021-05-12 18:44:10 +08:00
committed by GitHub
parent 5a8b2a4f86
commit b98c9ab439
10 changed files with 103 additions and 34 deletions

View File

@@ -380,9 +380,19 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* would close the underneath file handles. * would close the underneath file handles.
*/ */
public void cleanHandles() { public void cleanHandles() {
this.bucketToHandles.values().forEach(handle -> { this.bucketToHandles.values()
((MiniBatchHandle) handle).finishWrite(); .forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
}); this.bucketToHandles.clear();
}
/**
* Clean the write handles within a checkpoint interval, this operation
* would close the underneath file handles, if any error happens, clean the
* corrupted data file.
*/
public void cleanHandlesGracefully() {
this.bucketToHandles.values()
.forEach(handle -> ((MiniBatchHandle) handle).closeGracefully());
this.bucketToHandles.clear(); this.bucketToHandles.clear();
} }
@@ -438,12 +448,6 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
return table; return table;
} }
public List<String> getInflightsAndRequestedInstants(String commitType) {
HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
}
public String getLastPendingInstant(HoodieTableType tableType) { public String getLastPendingInstant(HoodieTableType tableType) {
final String actionType = CommitUtils.getCommitActionType(tableType); final String actionType = CommitUtils.getCommitActionType(tableType);
return getLastPendingInstant(actionType); return getLastPendingInstant(actionType);

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -46,12 +47,12 @@ import java.util.List;
* @param <K> Key type * @param <K> Key type
* @param <O> Output type * @param <O> Output type
*/ */
public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle { public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class); private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
private boolean needBootStrap = true;
// Total number of bytes written to file private boolean shouldRollover = false;
private long sizeInBytes = 0;
public FlinkAppendHandle( public FlinkAppendHandle(
HoodieWriteConfig config, HoodieWriteConfig config,
@@ -64,6 +65,17 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends H
super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier); super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier);
} }
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// In some rare cases, the task was pulled up again with same write file name,
// for e.g, reuse the small log files from last commit instant.
// Just skip the marker file creation if it already exists, the new data would append to
// the file directly.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
@Override @Override
protected boolean needsUpdateLocation() { protected boolean needsUpdateLocation() {
return false; return false;
@@ -80,12 +92,8 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends H
return true; return true;
} }
/** public boolean shouldRollover() {
* Returns whether there is need to bootstrap this file handle. return this.shouldRollover;
* E.G. the first time that the handle is created.
*/
public boolean isNeedBootStrap() {
return this.needBootStrap;
} }
/** /**
@@ -98,7 +106,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends H
@Override @Override
public List<WriteStatus> close() { public List<WriteStatus> close() {
needBootStrap = false; shouldRollover = true;
// flush any remaining records to disk // flush any remaining records to disk
appendDataAndDeleteBlocks(header); appendDataAndDeleteBlocks(header);
// need to fix that the incremental write size in bytes may be lost // need to fix that the incremental write size in bytes may be lost
@@ -118,4 +126,15 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends H
throw new HoodieUpsertException("Failed to close append handle", e); throw new HoodieUpsertException("Failed to close append handle", e);
} }
} }
@Override
public void closeGracefully() {
try {
finishWrite();
} catch (Throwable throwable) {
// The intermediate log file can still append based on the incremental MERGE semantics,
// there is no need to delete the file.
LOG.warn("Error while trying to dispose the APPEND handle", throwable);
}
}
} }

View File

@@ -185,6 +185,22 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
} }
} }
@Override
public void closeGracefully() {
try {
finishWrite();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the CREATE handle", throwable);
try {
fs.delete(path, false);
LOG.info("Deleting the intermediate CREATE data file: " + path + " success!");
} catch (IOException e) {
// logging a warning and ignore the exception.
LOG.warn("Deleting the intermediate CREATE data file: " + path + " failed", e);
}
}
}
/** /**
* Performs actions to durably, persist the current changes and returns a WriteStatus object. * Performs actions to durably, persist the current changes and returns a WriteStatus object.
*/ */

View File

@@ -238,4 +238,20 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e); throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e);
} }
} }
@Override
public void closeGracefully() {
try {
finishWrite();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the MERGE handle", throwable);
try {
fs.delete(newFilePath, false);
LOG.info("Deleting the intermediate MERGE data file: " + newFilePath + " success!");
} catch (IOException e) {
// logging a warning and ignore the exception.
LOG.warn("Deleting the intermediate MERGE data file: " + newFilePath + " failed", e);
}
}
}
} }

View File

@@ -22,9 +22,24 @@ package org.apache.hudi.io;
* Hoodie write handle that supports write as mini-batch. * Hoodie write handle that supports write as mini-batch.
*/ */
public interface MiniBatchHandle { public interface MiniBatchHandle {
/**
* Returns whether the handle should roll over to new,
* E.G. the handle has written some intermediate data buffer already.
*/
default boolean shouldRollover() {
return false;
}
/** /**
* Finish the write of multiple mini-batches. Usually these mini-bathes * Finish the write of multiple mini-batches. Usually these mini-bathes
* come from a checkpoint interval. * come from one checkpoint interval.
*/ */
void finishWrite(); void finishWrite();
/**
* Close the file handle gracefully, if any error happens during the file handle close,
* clean the file to not left corrupted file.
*/
void closeGracefully();
} }

View File

@@ -49,7 +49,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordP
@Override @Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) { public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
if (!appendHandle.isNeedBootStrap()) { if (appendHandle.shouldRollover()) {
appendHandle.appendNewRecords(recordItr); appendHandle.appendNewRecords(recordItr);
} }
appendHandle.doAppend(); appendHandle.doAppend();

View File

@@ -200,7 +200,7 @@ public class StreamWriteFunction<K, I, O>
@Override @Override
public void close() { public void close() {
if (this.writeClient != null) { if (this.writeClient != null) {
this.writeClient.cleanHandles(); this.writeClient.cleanHandlesGracefully();
this.writeClient.close(); this.writeClient.close();
} }
} }

View File

@@ -249,7 +249,7 @@ public class StreamWriteOperatorCoordinator
"The coordinator can only handle BatchWriteSuccessEvent"); "The coordinator can only handle BatchWriteSuccessEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
// the write task does not block after checkpointing(and before it receives a checkpoint success event), // the write task does not block after checkpointing(and before it receives a checkpoint success event),
// if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
// success event, the data buffer would flush with an older instant time. // success event, the data buffer would flush with an older instant time.
ValidationUtils.checkState( ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),

View File

@@ -102,7 +102,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
// Utilities // Utilities
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/** Validate required options. e.g record key and pre combine key. /** Validate required options. For e.g, record key and pre_combine key.
* *
* @param conf The table options * @param conf The table options
* @param schema The table schema * @param schema The table schema
@@ -115,17 +115,17 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")) Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
.filter(field -> !fields.contains(field)) .filter(field -> !fields.contains(field))
.findAny() .findAny()
.ifPresent(e -> { .ifPresent(f -> {
throw new ValidationException("The " + e + " field not exists in table schema." throw new ValidationException("Field '" + f + "' does not exist in the table schema."
+ "Please define primary key or modify hoodie.datasource.write.recordkey.field option."); + "Please define primary key or modify 'hoodie.datasource.write.recordkey.field' option.");
}); });
} }
// validate pre combine key // validate pre_combine key
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) { if (!fields.contains(preCombineField)) {
throw new ValidationException("The " + preCombineField + " field not exists in table schema." throw new ValidationException("Field " + preCombineField + " does not exist in the table schema."
+ "Please check write.precombine.field option."); + "Please check 'write.precombine.field' option.");
} }
} }

View File

@@ -392,9 +392,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
} }
@ParameterizedTest @Test
@EnumSource(value = ExecMode.class) void testStreamReadEmptyTablePath() throws Exception {
void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
// create an empty table // create an empty table
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.initTableIfNotExists(conf); StreamerUtil.initTableIfNotExists(conf);