diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d0778f75d..9800749ee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -106,9 +106,16 @@ public class HoodieMergeHandle extends H public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, + hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); + } + + public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); init(fileId, recordItr); - init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get()); + init(fileId, partitionPath, baseFile); } /** @@ -173,6 +180,10 @@ public class HoodieMergeHandle extends H } } + protected void setWriteStatusPath() { + writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath); + } + protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { oldFilePath = makeNewFilePath(partitionPath, oldFileName); newFilePath = makeNewFilePath(partitionPath, newFileName); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 6cbbeecb2..9ba0961a8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -47,6 +47,7 @@ import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.FlinkAppendHandle; import org.apache.hudi.io.FlinkCreateHandle; +import org.apache.hudi.io.FlinkMergeAndReplaceHandle; import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.MiniBatchHandle; @@ -376,12 +377,10 @@ public class HoodieFlinkWriteClient extends } /** - * Clean the write handles within a checkpoint interval, this operation - * would close the underneath file handles. + * Clean the write handles within a checkpoint interval. + * All the handles should have been closed already. */ public void cleanHandles() { - this.bucketToHandles.values() - .forEach(handle -> ((MiniBatchHandle) handle).finishWrite()); this.bucketToHandles.clear(); } @@ -414,10 +413,18 @@ public class HoodieFlinkWriteClient extends Iterator> recordItr) { final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); - if (bucketToHandles.containsKey(fileID)) { - return bucketToHandles.get(fileID); - } final String partitionPath = record.getPartitionPath(); + if (bucketToHandles.containsKey(fileID)) { + MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); + if (lastHandle.shouldReplace()) { + HoodieWriteHandle writeHandle = new FlinkMergeAndReplaceHandle<>( + config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), + lastHandle.getWritePath()); + this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle + return writeHandle; + } + } + final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); final HoodieWriteHandle writeHandle; if (isDelta) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index c859d261c..987f3350d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -23,36 +23,32 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** - * A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches). + * A {@link HoodieAppendHandle} that supports APPEND write incrementally(mini-batches). * - *

For the first mini-batch, it initialize and set up the next file path to write, - * but does not close the file writer until all the mini-batches write finish. Each mini-batch - * data are appended to this handle, the back-up writer may rollover on condition. + *

For the first mini-batch, it initializes and sets up the next file path to write, + * then closes the file writer. The subsequent mini-batches are appended to the same file + * through a different append handle with same write file name. * - * @param Payload type - * @param Input type - * @param Key type - * @param Output type + *

The back-up writer may rollover on condition(for e.g, the filesystem does not support append + * or the file size hits the configured threshold). */ public class FlinkAppendHandle extends HoodieAppendHandle implements MiniBatchHandle { private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class); - private boolean shouldRollover = false; + private boolean isClosed = false; public FlinkAppendHandle( HoodieWriteConfig config, @@ -92,49 +88,37 @@ public class FlinkAppendHandle return true; } - public boolean shouldRollover() { - return this.shouldRollover; - } - - /** - * Appends new records into this append handle. - * @param recordItr The new records iterator - */ - public void appendNewRecords(Iterator> recordItr) { - this.recordItr = recordItr; - } - @Override public List close() { - shouldRollover = true; - // flush any remaining records to disk - appendDataAndDeleteBlocks(header); - // need to fix that the incremental write size in bytes may be lost - List ret = new ArrayList<>(statuses); - statuses.clear(); - return ret; - } - - @Override - public void finishWrite() { - LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { - if (writer != null) { - writer.close(); - } - } catch (IOException e) { - throw new HoodieUpsertException("Failed to close append handle", e); + return super.close(); + } finally { + this.isClosed = true; } } @Override public void closeGracefully() { + if (isClosed) { + return; + } try { - finishWrite(); + close(); } catch (Throwable throwable) { // The intermediate log file can still append based on the incremental MERGE semantics, // there is no need to delete the file. LOG.warn("Error while trying to dispose the APPEND handle", throwable); } } + + @Override + public Path getWritePath() { + return writer.getLogFile().getPath(); + } + + @Override + public boolean shouldReplace() { + // log files can append new data buffer directly + return false; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index fe5bf99b7..2de6a5724 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -23,11 +23,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -36,27 +34,24 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.Collections; import java.util.List; /** - * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches). + * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches). * - *

For the first mini-batch, it initialize and set up the next file path to write, - * but does not close the file writer until all the mini-batches write finish. Each mini-batch - * data are appended to the same file. + *

For the first mini-batch, it initializes and sets up the next file path to write, + * then closes the file writer. The subsequent mini-batches are appended to a file with new name, + * the new file would then rename to this file name, + * behaves like each mini-batch data are appended to the same file. * - * @param Payload type - * @param Input type - * @param Key type - * @param Output type + * @see FlinkMergeAndReplaceHandle */ public class FlinkCreateHandle extends HoodieCreateHandle implements MiniBatchHandle { private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); - private long lastFileSize = 0L; - private long totalRecordsWritten = 0L; + + private boolean isClosed = false; public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { @@ -138,57 +133,22 @@ public class FlinkCreateHandle return makeNewFilePath(partitionPath, dataFileName); } - /** - * Get the incremental write status. In mini-batch write mode, - * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches), - * thus, after a mini-batch append finish, we do not close the underneath writer but return - * the incremental WriteStatus instead. - * - * @return the incremental write status - */ - private WriteStatus getIncrementalWriteStatus() { + @Override + public List close() { try { - setupWriteStatus(); - // reset the write status - totalRecordsWritten += recordsWritten; - recordsWritten = 0; - recordsDeleted = 0; - insertRecordsWritten = 0; - timer = new HoodieTimer().startTimer(); - writeStatus.setTotalErrorRecords(0); - return writeStatus; - } catch (IOException e) { - throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); - } - } - - @Override - protected long computeTotalWriteBytes() { - long fileSizeInBytes = computeFileSizeInBytes(); - long incFileSizeInBytes = fileSizeInBytes - lastFileSize; - this.lastFileSize = fileSizeInBytes; - return incFileSizeInBytes; - } - - @Override - protected long computeFileSizeInBytes() { - return fileWriter.getBytesWritten(); - } - - @Override - public void finishWrite() { - LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten); - try { - fileWriter.close(); - } catch (IOException e) { - throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); + return super.close(); + } finally { + this.isClosed = true; } } @Override public void closeGracefully() { + if (isClosed) { + return; + } try { - finishWrite(); + close(); } catch (Throwable throwable) { LOG.warn("Error while trying to dispose the CREATE handle", throwable); try { @@ -201,11 +161,8 @@ public class FlinkCreateHandle } } - /** - * Performs actions to durably, persist the current changes and returns a WriteStatus object. - */ @Override - public List close() { - return Collections.singletonList(getIncrementalWriteStatus()); + public Path getWritePath() { + return path; } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java new file mode 100644 index 000000000..44d630ac8 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). + * + *

This handle is needed from the second mini-batch write for COW data bucket + * when the data bucket is written using multiple mini-batches. + * + *

For the incremental data buffer, it initializes and sets up the next file path to write, + * then closes the file and rename to the old file name, + * behaves like the new data buffer are appended to the old file. + */ +public class FlinkMergeAndReplaceHandle + extends HoodieMergeHandle + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandle.class); + + private boolean isClosed = false; + + public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Path basePath) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, + new HoodieBaseFile(basePath.toString())); + // delete invalid data files generated by task retry. + if (getAttemptId() > 0) { + deleteInvalidDataFile(getAttemptId() - 1); + } + } + + /** + * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) + * (thus the fs view got the written data files some of which may be invalid), + * it goes on with the next round checkpoint(B) write immediately, + * if it tries to reuse the last small data bucket(small file) of an invalid data file, + * finally, when the coordinator receives the checkpoint success event of checkpoint(A), + * the invalid data file would be cleaned, + * and this merger got a FileNotFoundException when it close the write file handle. + * + *

To solve, deletes the invalid data file eagerly + * so that the invalid file small bucket would never be reused. + * + * @param lastAttemptId The last attempt ID + */ + private void deleteInvalidDataFile(long lastAttemptId) { + final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId); + final String lastDataFileName = FSUtils.makeDataFileName(instantTime, + lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension()); + final Path path = makeNewFilePath(partitionPath, lastDataFileName); + try { + if (fs.exists(path)) { + LOG.info("Deleting invalid MERGE and REPLACE base file due to task retry: " + lastDataFileName); + fs.delete(path, false); + } + } catch (IOException e) { + throw new HoodieException("Error while deleting the MERGE and REPLACE base file due to task retry: " + lastDataFileName, e); + } + } + + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + // no need to create any marker file for intermediate file. + } + + @Override + protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { + // old and new file name expects to be the same. + if (!oldFileName.equals(newFileName)) { + LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n" + + "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n" + + "this rarely happens when the checkpoint success event was not received yet\n" + + "but the write task flush with new instant time, which does not break the UPSERT semantics"); + } + super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName); + try { + int rollNumber = 0; + while (fs.exists(newFilePath)) { + Path oldPath = newFilePath; + newFileName = newFileNameWithRollover(rollNumber++); + newFilePath = makeNewFilePath(partitionPath, newFileName); + LOG.warn("Duplicate write for MERGE and REPLACE handle with path: " + oldPath + ", rolls over to new path: " + newFilePath); + } + } catch (IOException e) { + throw new HoodieException("Checking existing path for merge and replace handle error: " + newFilePath, e); + } + } + + /** + * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. + */ + protected String newFileNameWithRollover(int rollNumber) { + // make the intermediate file as hidden + return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, + this.fileId, hoodieTable.getBaseFileExtension()); + } + + @Override + protected void setWriteStatusPath() { + // should still report the old file path. + writeStatus.getStat().setPath(new Path(config.getBasePath()), oldFilePath); + } + + boolean needsUpdateLocation() { + // No need to update location for Flink hoodie records because all the records are pre-tagged + // with the desired locations. + return false; + } + + public void finalizeWrite() { + // The file visibility should be kept by the configured ConsistencyGuard instance. + try { + fs.delete(oldFilePath, false); + } catch (IOException e) { + throw new HoodieIOException("Error while cleaning the old base file: " + oldFilePath, e); + } + try { + fs.rename(newFilePath, oldFilePath); + } catch (IOException e) { + throw new HoodieIOException("Error while renaming the temporary roll file: " + + newFilePath + " to old base file name: " + oldFilePath, e); + } + } + + @Override + public List close() { + try { + List writeStatuses = super.close(); + finalizeWrite(); + return writeStatuses; + } finally { + this.isClosed = true; + } + } + + @Override + public void closeGracefully() { + if (isClosed) { + return; + } + try { + close(); + } catch (Throwable throwable) { + LOG.warn("Error while trying to dispose the MERGE handle", throwable); + try { + fs.delete(newFilePath, false); + LOG.info("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " success!"); + } catch (IOException e) { + // logging a warning and ignore the exception. + LOG.warn("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " failed", e); + } + } + } + + @Override + public Path getWritePath() { + return oldFilePath; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 518ea6928..870a6aae2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -23,12 +23,10 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.MarkerFiles; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -40,7 +38,7 @@ import java.util.Iterator; import java.util.List; /** - * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers). + * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). * *

For a new data buffer, it initialize and set up the next file path to write, * and closes the file path when the data buffer write finish. When next data buffer @@ -48,10 +46,7 @@ import java.util.List; * for a checkpoint round, it renames the last new file path as the desired file name * (name with the expected file ID). * - * @param Payload type - * @param Input type - * @param Key type - * @param Output type + * @see FlinkMergeAndReplaceHandle */ public class FlinkMergeHandle extends HoodieMergeHandle @@ -59,15 +54,7 @@ public class FlinkMergeHandle private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); - /** - * Records the current file handles number that rolls over. - */ - private int rollNumber = 0; - - /** - * Whether the handle should roll over to new, E.G. the handle has written some intermediate files already. - */ - private volatile boolean shouldRollover = false; + private boolean isClosed = false; /** * Records the rolled over file paths. @@ -130,6 +117,7 @@ public class FlinkMergeHandle super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName); rolloverPaths = new ArrayList<>(); try { + int rollNumber = 0; while (fs.exists(newFilePath)) { oldFilePath = newFilePath; // override the old file name rolloverPaths.add(oldFilePath); @@ -151,15 +139,15 @@ public class FlinkMergeHandle this.fileId, hoodieTable.getBaseFileExtension()); } - public boolean shouldRollover() { - return shouldRollover; - } - @Override public List close() { - List writeStatus = super.close(); - this.shouldRollover = true; - return writeStatus; + try { + List writeStatus = super.close(); + finalizeWrite(); + return writeStatus; + } finally { + this.isClosed = true; + } } boolean needsUpdateLocation() { @@ -168,53 +156,7 @@ public class FlinkMergeHandle return false; } - /** - * - * Rolls over the write handle to prepare for the next batch write. - * - *

It tweaks the handle state as following: - * - *

    - *
  • Increment the {@code rollNumber}
  • - *
  • Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned
  • - *
  • Make the last new file path as old
  • - *
  • Initialize the new file path and file writer
  • - *
- * - * @param newRecordsItr The records iterator to update - */ - public void rollOver(Iterator> newRecordsItr) { - init(this.fileId, newRecordsItr); - this.recordsWritten = 0; - this.recordsDeleted = 0; - this.updatedRecordsWritten = 0; - this.insertRecordsWritten = 0; - this.writeStatus.setTotalErrorRecords(0); - this.timer = new HoodieTimer().startTimer(); - - rollNumber += 1; - - rolloverPaths.add(newFilePath); - oldFilePath = newFilePath; - final String newFileName = newFileNameWithRollover(rollNumber); - newFilePath = makeNewFilePath(partitionPath, newFileName); - - // create the marker file so that the intermediate roll over files - // of the retry task can be cleaned. - MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); - markerFiles.createIfNotExists(partitionPath, newFileName, getIOType()); - - try { - fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); - } catch (IOException e) { - throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e); - } - - LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), - newFilePath.toString())); - } - - public void finishWrite() { + public void finalizeWrite() { // The file visibility should be kept by the configured ConsistencyGuard instance. rolloverPaths.add(newFilePath); if (rolloverPaths.size() == 1) { @@ -241,8 +183,11 @@ public class FlinkMergeHandle @Override public void closeGracefully() { + if (isClosed) { + return; + } try { - finishWrite(); + close(); } catch (Throwable throwable) { LOG.warn("Error while trying to dispose the MERGE handle", throwable); try { @@ -254,4 +199,9 @@ public class FlinkMergeHandle } } } + + @Override + public Path getWritePath() { + return newFilePath; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java index 30ac31724..5f022d0ae 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java @@ -18,28 +18,36 @@ package org.apache.hudi.io; +import org.apache.hadoop.fs.Path; + /** * Hoodie write handle that supports write as mini-batch. */ public interface MiniBatchHandle { /** - * Returns whether the handle should roll over to new, - * E.G. the handle has written some intermediate data buffer already. + * Finalize the write of one mini-batch. Usually these mini-bathes + * come from one checkpoint interval. The file handle may roll over to new name + * if the name conflicts, give a chance to clean the intermediate file. */ - default boolean shouldRollover() { - return false; - } - - /** - * Finish the write of multiple mini-batches. Usually these mini-bathes - * come from one checkpoint interval. - */ - void finishWrite(); + default void finalizeWrite() {} /** * Close the file handle gracefully, if any error happens during the file handle close, * clean the file to not left corrupted file. */ void closeGracefully(); + + /** + * Returns the write file path. + */ + Path getWritePath(); + + /** + * Whether the old write file should be replaced with the same name new file + * using content merged with incremental new data batch. + */ + default boolean shouldReplace() { + return true; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index f3e10e8ca..5cfd28be2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -32,11 +32,9 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; import org.apache.hudi.io.ExplicitWriteHandleFactory; -import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; @@ -160,10 +158,6 @@ public abstract class BaseFlinkCommitActionExecutor> getPartitionToReplacedFileIds(List writeStatuses) { - return Collections.emptyMap(); - } - @Override protected boolean isWorkloadProfileNeeded() { return true; @@ -177,26 +171,29 @@ public abstract class BaseFlinkCommitActionExecutor) Collections.EMPTY_LIST).iterator(); } // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr); + HoodieMergeHandle upsertHandle = (HoodieMergeHandle) this.writeHandle; return handleUpdateInternal(upsertHandle, fileId); } @@ -234,19 +231,6 @@ public abstract class BaseFlinkCommitActionExecutor> recordItr) { - if (table.requireSortedRecords()) { - throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write"); - } else { - FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle; - // add the incremental records. - if (writeHandle.shouldRollover()) { - writeHandle.rollOver(recordItr); - } - return writeHandle; - } - } - @Override public Iterator> handleInsert(String idPfx, Iterator> recordItr) throws Exception { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 5e9d8cb19..6391750fd 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -62,17 +61,14 @@ public class FlinkMergeHelper extends AbstractMer @Override public void runMerge(HoodieTable>, List, List> table, - HoodieMergeHandle>, List, List> upsertHandle) throws IOException { - final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); - Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; - HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); - + HoodieMergeHandle>, List, List> mergeHandle) throws IOException { final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; - if (isFirstTimeMerge(mergeHandle) - && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { + + final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); + HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); + if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); @@ -83,10 +79,11 @@ public class FlinkMergeHelper extends AbstractMer } BoundedInMemoryExecutor wrapper = null; + Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; - if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) { + if (baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { readerIterator = reader.getRecordIterator(readSchema); @@ -114,9 +111,4 @@ public class FlinkMergeHelper extends AbstractMer } } } - - private static boolean isFirstTimeMerge(HoodieMergeHandle mergeHandle) { - return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover(); - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java index 674a8fd21..1fe98204b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java @@ -49,9 +49,6 @@ public abstract class BaseFlinkDeltaCommitActionExecutor> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) { FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; - if (appendHandle.shouldRollover()) { - appendHandle.appendNewRecords(recordItr); - } appendHandle.doAppend(); List writeStatuses = appendHandle.close(); return Collections.singletonList(writeStatuses).iterator(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index d9cb38912..e35419adc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -142,22 +142,6 @@ public class StreamWriteFunction */ private transient TotalSizeTracer tracer; - /** - * Flag saying whether the write task is waiting for the checkpoint success notification - * after it finished a checkpoint. - * - *

The flag is needed because the write task does not block during the waiting time interval, - * some data buckets still flush out with old instant time. There are two cases that the flush may produce - * corrupted files if the old instant is committed successfully: - * 1) the write handle was writing data but interrupted, left a corrupted parquet file; - * 2) the write handle finished the write but was not closed, left an empty parquet file. - * - *

To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms, - * the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because - * it always filters the data files based on successful commit time. - */ - private volatile boolean confirming = false; - /** * Constructs a StreamingSinkFunction. * @@ -207,7 +191,7 @@ public class StreamWriteFunction @Override public void notifyCheckpointComplete(long checkpointId) { - this.confirming = false; + this.writeClient.cleanHandles(); } /** @@ -416,16 +400,12 @@ public class StreamWriteFunction return; } - // if we are waiting for the checkpoint notification, shift the write instant time. - boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant); - final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant; - List records = bucket.records; ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } - final List writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant)); + final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. @@ -473,7 +453,5 @@ public class StreamWriteFunction this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); - this.writeClient.cleanHandles(); - this.confirming = true; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 7da980135..21fbc5aae 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -500,9 +500,8 @@ public class TestWriteCopyOnWrite { Map getMiniBatchExpected() { Map expected = new HashMap<>(); - expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " - + "id1,par1,id1,Danny,23,1,par1, " + // the last 3 lines are merged + expected.put("par1", "[" + "id1,par1,id1,Danny,23,1,par1, " + "id1,par1,id1,Danny,23,1,par1]"); return expected; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 5050109a8..a4b6c16a3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -163,6 +163,7 @@ public class StreamWriteFunctionWrapper { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); + this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { try { compactFunctionWrapper.compact(checkpointId);