diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index a84e116a6..6cbbeecb2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -380,9 +380,19 @@ public class HoodieFlinkWriteClient extends * would close the underneath file handles. */ public void cleanHandles() { - this.bucketToHandles.values().forEach(handle -> { - ((MiniBatchHandle) handle).finishWrite(); - }); + this.bucketToHandles.values() + .forEach(handle -> ((MiniBatchHandle) handle).finishWrite()); + this.bucketToHandles.clear(); + } + + /** + * Clean the write handles within a checkpoint interval, this operation + * would close the underneath file handles, if any error happens, clean the + * corrupted data file. + */ + public void cleanHandlesGracefully() { + this.bucketToHandles.values() + .forEach(handle -> ((MiniBatchHandle) handle).closeGracefully()); this.bucketToHandles.clear(); } @@ -438,12 +448,6 @@ public class HoodieFlinkWriteClient extends return table; } - public List getInflightsAndRequestedInstants(String commitType) { - HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); - return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - } - public String getLastPendingInstant(HoodieTableType tableType) { final String actionType = CommitUtils.getCommitActionType(tableType); return getLastPendingInstant(actionType); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index bc6eba8eb..c859d261c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +47,12 @@ import java.util.List; * @param Key type * @param Output type */ -public class FlinkAppendHandle extends HoodieAppendHandle implements MiniBatchHandle { +public class FlinkAppendHandle + extends HoodieAppendHandle implements MiniBatchHandle { private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class); - private boolean needBootStrap = true; - // Total number of bytes written to file - private long sizeInBytes = 0; + + private boolean shouldRollover = false; public FlinkAppendHandle( HoodieWriteConfig config, @@ -64,6 +65,17 @@ public class FlinkAppendHandle extends H super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr, taskContextSupplier); } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + // In some rare cases, the task was pulled up again with same write file name, + // for e.g, reuse the small log files from last commit instant. + + // Just skip the marker file creation if it already exists, the new data would append to + // the file directly. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + } + @Override protected boolean needsUpdateLocation() { return false; @@ -80,12 +92,8 @@ public class FlinkAppendHandle extends H return true; } - /** - * Returns whether there is need to bootstrap this file handle. - * E.G. the first time that the handle is created. - */ - public boolean isNeedBootStrap() { - return this.needBootStrap; + public boolean shouldRollover() { + return this.shouldRollover; } /** @@ -98,7 +106,7 @@ public class FlinkAppendHandle extends H @Override public List close() { - needBootStrap = false; + shouldRollover = true; // flush any remaining records to disk appendDataAndDeleteBlocks(header); // need to fix that the incremental write size in bytes may be lost @@ -118,4 +126,15 @@ public class FlinkAppendHandle extends H throw new HoodieUpsertException("Failed to close append handle", e); } } + + @Override + public void closeGracefully() { + try { + finishWrite(); + } catch (Throwable throwable) { + // The intermediate log file can still append based on the incremental MERGE semantics, + // there is no need to delete the file. + LOG.warn("Error while trying to dispose the APPEND handle", throwable); + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index fc0f8efe0..fe5bf99b7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -185,6 +185,22 @@ public class FlinkCreateHandle } } + @Override + public void closeGracefully() { + try { + finishWrite(); + } catch (Throwable throwable) { + LOG.warn("Error while trying to dispose the CREATE handle", throwable); + try { + fs.delete(path, false); + LOG.info("Deleting the intermediate CREATE data file: " + path + " success!"); + } catch (IOException e) { + // logging a warning and ignore the exception. + LOG.warn("Deleting the intermediate CREATE data file: " + path + " failed", e); + } + } + } + /** * Performs actions to durably, persist the current changes and returns a WriteStatus object. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 023ee5f74..518ea6928 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -238,4 +238,20 @@ public class FlinkMergeHandle throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e); } } + + @Override + public void closeGracefully() { + try { + finishWrite(); + } catch (Throwable throwable) { + LOG.warn("Error while trying to dispose the MERGE handle", throwable); + try { + fs.delete(newFilePath, false); + LOG.info("Deleting the intermediate MERGE data file: " + newFilePath + " success!"); + } catch (IOException e) { + // logging a warning and ignore the exception. + LOG.warn("Deleting the intermediate MERGE data file: " + newFilePath + " failed", e); + } + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java index 2cae807a9..30ac31724 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java @@ -22,9 +22,24 @@ package org.apache.hudi.io; * Hoodie write handle that supports write as mini-batch. */ public interface MiniBatchHandle { + + /** + * Returns whether the handle should roll over to new, + * E.G. the handle has written some intermediate data buffer already. + */ + default boolean shouldRollover() { + return false; + } + /** * Finish the write of multiple mini-batches. Usually these mini-bathes - * come from a checkpoint interval. + * come from one checkpoint interval. */ void finishWrite(); + + /** + * Close the file handle gracefully, if any error happens during the file handle close, + * clean the file to not left corrupted file. + */ + void closeGracefully(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java index 8d0857eb7..674a8fd21 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java @@ -49,7 +49,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) { FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; - if (!appendHandle.isNeedBootStrap()) { + if (appendHandle.shouldRollover()) { appendHandle.appendNewRecords(recordItr); } appendHandle.doAppend(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 5a04a35da..d9cb38912 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -200,7 +200,7 @@ public class StreamWriteFunction @Override public void close() { if (this.writeClient != null) { - this.writeClient.cleanHandles(); + this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 6244d6510..ddbb3ddf1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -249,7 +249,7 @@ public class StreamWriteOperatorCoordinator "The coordinator can only handle BatchWriteSuccessEvent"); BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; // the write task does not block after checkpointing(and before it receives a checkpoint success event), - // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint // success event, the data buffer would flush with an older instant time. ValidationUtils.checkState( HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 02ab280a3..808e9c7e7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -102,7 +102,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // Utilities // ------------------------------------------------------------------------- - /** Validate required options. e.g record key and pre combine key. + /** Validate required options. For e.g, record key and pre_combine key. * * @param conf The table options * @param schema The table schema @@ -115,17 +115,17 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")) .filter(field -> !fields.contains(field)) .findAny() - .ifPresent(e -> { - throw new ValidationException("The " + e + " field not exists in table schema." - + "Please define primary key or modify hoodie.datasource.write.recordkey.field option."); + .ifPresent(f -> { + throw new ValidationException("Field '" + f + "' does not exist in the table schema." + + "Please define primary key or modify 'hoodie.datasource.write.recordkey.field' option."); }); } - // validate pre combine key + // validate pre_combine key String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); if (!fields.contains(preCombineField)) { - throw new ValidationException("The " + preCombineField + " field not exists in table schema." - + "Please check write.precombine.field option."); + throw new ValidationException("Field " + preCombineField + " does not exist in the table schema." + + "Please check 'write.precombine.field' option."); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 69627f23a..41c587c7a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -392,9 +392,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); } - @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception { + @Test + void testStreamReadEmptyTablePath() throws Exception { // create an empty table Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); StreamerUtil.initTableIfNotExists(conf);