1
0

[HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)

Close the file handle eagerly to avoid corrupted files as much as
possible.
This commit is contained in:
Danny Chan
2021-05-14 10:25:18 +08:00
committed by GitHub
parent 6f7ff7e8ca
commit ad77cf42ba
13 changed files with 340 additions and 276 deletions

View File

@@ -106,9 +106,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
}
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
init(fileId, partitionPath, baseFile);
}
/**
@@ -173,6 +180,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
}
}
protected void setWriteStatusPath() {
writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
}
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
oldFilePath = makeNewFilePath(partitionPath, oldFileName);
newFilePath = makeNewFilePath(partitionPath, newFileName);

View File

@@ -47,6 +47,7 @@ import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
@@ -376,12 +377,10 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
/**
* Clean the write handles within a checkpoint interval, this operation
* would close the underneath file handles.
* Clean the write handles within a checkpoint interval.
* All the handles should have been closed already.
*/
public void cleanHandles() {
this.bucketToHandles.values()
.forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
this.bucketToHandles.clear();
}
@@ -414,10 +413,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
if (bucketToHandles.containsKey(fileID)) {
return bucketToHandles.get(fileID);
}
final String partitionPath = record.getPartitionPath();
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
lastHandle.getWritePath());
this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
return writeHandle;
}
}
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {

View File

@@ -23,36 +23,32 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches).
* A {@link HoodieAppendHandle} that supports APPEND write incrementally(mini-batches).
*
* <p>For the first mini-batch, it initialize and set up the next file path to write,
* but does not close the file writer until all the mini-batches write finish. Each mini-batch
* data are appended to this handle, the back-up writer may rollover on condition.
* <p>For the first mini-batch, it initializes and sets up the next file path to write,
* then closes the file writer. The subsequent mini-batches are appended to the same file
* through a different append handle with same write file name.
*
* @param <T> Payload type
* @param <I> Input type
* @param <K> Key type
* @param <O> Output type
* <p>The back-up writer may rollover on condition(for e.g, the filesystem does not support append
* or the file size hits the configured threshold).
*/
public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
private boolean shouldRollover = false;
private boolean isClosed = false;
public FlinkAppendHandle(
HoodieWriteConfig config,
@@ -92,49 +88,37 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
return true;
}
public boolean shouldRollover() {
return this.shouldRollover;
}
/**
* Appends new records into this append handle.
* @param recordItr The new records iterator
*/
public void appendNewRecords(Iterator<HoodieRecord<T>> recordItr) {
this.recordItr = recordItr;
}
@Override
public List<WriteStatus> close() {
shouldRollover = true;
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
// need to fix that the incremental write size in bytes may be lost
List<WriteStatus> ret = new ArrayList<>(statuses);
statuses.clear();
return ret;
}
@Override
public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
if (writer != null) {
writer.close();
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close append handle", e);
return super.close();
} finally {
this.isClosed = true;
}
}
@Override
public void closeGracefully() {
if (isClosed) {
return;
}
try {
finishWrite();
close();
} catch (Throwable throwable) {
// The intermediate log file can still append based on the incremental MERGE semantics,
// there is no need to delete the file.
LOG.warn("Error while trying to dispose the APPEND handle", throwable);
}
}
@Override
public Path getWritePath() {
return writer.getLogFile().getPath();
}
@Override
public boolean shouldReplace() {
// log files can append new data buffer directly
return false;
}
}

View File

@@ -23,11 +23,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -36,27 +34,24 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
* A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches).
*
* <p>For the first mini-batch, it initialize and set up the next file path to write,
* but does not close the file writer until all the mini-batches write finish. Each mini-batch
* data are appended to the same file.
* <p>For the first mini-batch, it initializes and sets up the next file path to write,
* then closes the file writer. The subsequent mini-batches are appended to a file with new name,
* the new file would then rename to this file name,
* behaves like each mini-batch data are appended to the same file.
*
* @param <T> Payload type
* @param <I> Input type
* @param <K> Key type
* @param <O> Output type
* @see FlinkMergeAndReplaceHandle
*/
public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
private long lastFileSize = 0L;
private long totalRecordsWritten = 0L;
private boolean isClosed = false;
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
@@ -138,57 +133,22 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
return makeNewFilePath(partitionPath, dataFileName);
}
/**
* Get the incremental write status. In mini-batch write mode,
* this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches),
* thus, after a mini-batch append finish, we do not close the underneath writer but return
* the incremental WriteStatus instead.
*
* @return the incremental write status
*/
private WriteStatus getIncrementalWriteStatus() {
@Override
public List<WriteStatus> close() {
try {
setupWriteStatus();
// reset the write status
totalRecordsWritten += recordsWritten;
recordsWritten = 0;
recordsDeleted = 0;
insertRecordsWritten = 0;
timer = new HoodieTimer().startTimer();
writeStatus.setTotalErrorRecords(0);
return writeStatus;
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
}
@Override
protected long computeTotalWriteBytes() {
long fileSizeInBytes = computeFileSizeInBytes();
long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
this.lastFileSize = fileSizeInBytes;
return incFileSizeInBytes;
}
@Override
protected long computeFileSizeInBytes() {
return fileWriter.getBytesWritten();
}
@Override
public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten);
try {
fileWriter.close();
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
return super.close();
} finally {
this.isClosed = true;
}
}
@Override
public void closeGracefully() {
if (isClosed) {
return;
}
try {
finishWrite();
close();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the CREATE handle", throwable);
try {
@@ -201,11 +161,8 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
}
/**
* Performs actions to durably, persist the current changes and returns a WriteStatus object.
*/
@Override
public List<WriteStatus> close() {
return Collections.singletonList(getIncrementalWriteStatus());
public Path getWritePath() {
return path;
}
}

View File

@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
*
* <P>This handle is needed from the second mini-batch write for COW data bucket
* when the data bucket is written using multiple mini-batches.
*
* <p>For the incremental data buffer, it initializes and sets up the next file path to write,
* then closes the file and rename to the old file name,
* behaves like the new data buffer are appended to the old file.
*/
public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieMergeHandle<T, I, K, O>
implements MiniBatchHandle {
private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandle.class);
private boolean isClosed = false;
public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier, Path basePath) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
new HoodieBaseFile(basePath.toString()));
// delete invalid data files generated by task retry.
if (getAttemptId() > 0) {
deleteInvalidDataFile(getAttemptId() - 1);
}
}
/**
* The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
* (thus the fs view got the written data files some of which may be invalid),
* it goes on with the next round checkpoint(B) write immediately,
* if it tries to reuse the last small data bucket(small file) of an invalid data file,
* finally, when the coordinator receives the checkpoint success event of checkpoint(A),
* the invalid data file would be cleaned,
* and this merger got a FileNotFoundException when it close the write file handle.
*
* <p> To solve, deletes the invalid data file eagerly
* so that the invalid file small bucket would never be reused.
*
* @param lastAttemptId The last attempt ID
*/
private void deleteInvalidDataFile(long lastAttemptId) {
final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId);
final String lastDataFileName = FSUtils.makeDataFileName(instantTime,
lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
final Path path = makeNewFilePath(partitionPath, lastDataFileName);
try {
if (fs.exists(path)) {
LOG.info("Deleting invalid MERGE and REPLACE base file due to task retry: " + lastDataFileName);
fs.delete(path, false);
}
} catch (IOException e) {
throw new HoodieException("Error while deleting the MERGE and REPLACE base file due to task retry: " + lastDataFileName, e);
}
}
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// no need to create any marker file for intermediate file.
}
@Override
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
// old and new file name expects to be the same.
if (!oldFileName.equals(newFileName)) {
LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
+ "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
+ "this rarely happens when the checkpoint success event was not received yet\n"
+ "but the write task flush with new instant time, which does not break the UPSERT semantics");
}
super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
try {
int rollNumber = 0;
while (fs.exists(newFilePath)) {
Path oldPath = newFilePath;
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE and REPLACE handle with path: " + oldPath + ", rolls over to new path: " + newFilePath);
}
} catch (IOException e) {
throw new HoodieException("Checking existing path for merge and replace handle error: " + newFilePath, e);
}
}
/**
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
*/
protected String newFileNameWithRollover(int rollNumber) {
// make the intermediate file as hidden
return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber,
this.fileId, hoodieTable.getBaseFileExtension());
}
@Override
protected void setWriteStatusPath() {
// should still report the old file path.
writeStatus.getStat().setPath(new Path(config.getBasePath()), oldFilePath);
}
boolean needsUpdateLocation() {
// No need to update location for Flink hoodie records because all the records are pre-tagged
// with the desired locations.
return false;
}
public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
try {
fs.delete(oldFilePath, false);
} catch (IOException e) {
throw new HoodieIOException("Error while cleaning the old base file: " + oldFilePath, e);
}
try {
fs.rename(newFilePath, oldFilePath);
} catch (IOException e) {
throw new HoodieIOException("Error while renaming the temporary roll file: "
+ newFilePath + " to old base file name: " + oldFilePath, e);
}
}
@Override
public List<WriteStatus> close() {
try {
List<WriteStatus> writeStatuses = super.close();
finalizeWrite();
return writeStatuses;
} finally {
this.isClosed = true;
}
}
@Override
public void closeGracefully() {
if (isClosed) {
return;
}
try {
close();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the MERGE handle", throwable);
try {
fs.delete(newFilePath, false);
LOG.info("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " success!");
} catch (IOException e) {
// logging a warning and ignore the exception.
LOG.warn("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " failed", e);
}
}
}
@Override
public Path getWritePath() {
return oldFilePath;
}
}

View File

@@ -23,12 +23,10 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -40,7 +38,7 @@ import java.util.Iterator;
import java.util.List;
/**
* A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers).
* A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
*
* <p>For a new data buffer, it initialize and set up the next file path to write,
* and closes the file path when the data buffer write finish. When next data buffer
@@ -48,10 +46,7 @@ import java.util.List;
* for a checkpoint round, it renames the last new file path as the desired file name
* (name with the expected file ID).
*
* @param <T> Payload type
* @param <I> Input type
* @param <K> Key type
* @param <O> Output type
* @see FlinkMergeAndReplaceHandle
*/
public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieMergeHandle<T, I, K, O>
@@ -59,15 +54,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
/**
* Records the current file handles number that rolls over.
*/
private int rollNumber = 0;
/**
* Whether the handle should roll over to new, E.G. the handle has written some intermediate files already.
*/
private volatile boolean shouldRollover = false;
private boolean isClosed = false;
/**
* Records the rolled over file paths.
@@ -130,6 +117,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
rolloverPaths = new ArrayList<>();
try {
int rollNumber = 0;
while (fs.exists(newFilePath)) {
oldFilePath = newFilePath; // override the old file name
rolloverPaths.add(oldFilePath);
@@ -151,15 +139,15 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
this.fileId, hoodieTable.getBaseFileExtension());
}
public boolean shouldRollover() {
return shouldRollover;
}
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatus = super.close();
this.shouldRollover = true;
return writeStatus;
try {
List<WriteStatus> writeStatus = super.close();
finalizeWrite();
return writeStatus;
} finally {
this.isClosed = true;
}
}
boolean needsUpdateLocation() {
@@ -168,53 +156,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
return false;
}
/**
*
* Rolls over the write handle to prepare for the next batch write.
*
* <p>It tweaks the handle state as following:
*
* <ul>
* <li>Increment the {@code rollNumber}</li>
* <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li>
* <li>Make the last new file path as old</li>
* <li>Initialize the new file path and file writer</li>
* </ul>
*
* @param newRecordsItr The records iterator to update
*/
public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
init(this.fileId, newRecordsItr);
this.recordsWritten = 0;
this.recordsDeleted = 0;
this.updatedRecordsWritten = 0;
this.insertRecordsWritten = 0;
this.writeStatus.setTotalErrorRecords(0);
this.timer = new HoodieTimer().startTimer();
rollNumber += 1;
rolloverPaths.add(newFilePath);
oldFilePath = newFilePath;
final String newFileName = newFileNameWithRollover(rollNumber);
newFilePath = makeNewFilePath(partitionPath, newFileName);
// create the marker file so that the intermediate roll over files
// of the retry task can be cleaned.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, newFileName, getIOType());
try {
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
} catch (IOException e) {
throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e);
}
LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
newFilePath.toString()));
}
public void finishWrite() {
public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);
if (rolloverPaths.size() == 1) {
@@ -241,8 +183,11 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
@Override
public void closeGracefully() {
if (isClosed) {
return;
}
try {
finishWrite();
close();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the MERGE handle", throwable);
try {
@@ -254,4 +199,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
}
}
}
@Override
public Path getWritePath() {
return newFilePath;
}
}

View File

@@ -18,28 +18,36 @@
package org.apache.hudi.io;
import org.apache.hadoop.fs.Path;
/**
* Hoodie write handle that supports write as mini-batch.
*/
public interface MiniBatchHandle {
/**
* Returns whether the handle should roll over to new,
* E.G. the handle has written some intermediate data buffer already.
* Finalize the write of one mini-batch. Usually these mini-bathes
* come from one checkpoint interval. The file handle may roll over to new name
* if the name conflicts, give a chance to clean the intermediate file.
*/
default boolean shouldRollover() {
return false;
}
/**
* Finish the write of multiple mini-batches. Usually these mini-bathes
* come from one checkpoint interval.
*/
void finishWrite();
default void finalizeWrite() {}
/**
* Close the file handle gracefully, if any error happens during the file handle close,
* clean the file to not left corrupted file.
*/
void closeGracefully();
/**
* Returns the write file path.
*/
Path getWritePath();
/**
* Whether the old write file should be replaced with the same name new file
* using content merged with incremental new data batch.
*/
default boolean shouldReplace() {
return true;
}
}

View File

@@ -32,11 +32,9 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
@@ -160,10 +158,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
return Collections.emptyMap();
}
@Override
protected boolean isWorkloadProfileNeeded() {
return true;
@@ -177,26 +171,29 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
BucketType bucketType,
Iterator recordItr) {
try {
switch (bucketType) {
case INSERT:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
if (this.writeHandle instanceof HoodieCreateHandle) {
// During one checkpoint interval, an insert record could also be updated,
// for example, for an operation sequence of a record:
// I, U, | U, U
// - batch1 - | - batch2 -
// the first batch(batch1) operation triggers an INSERT bucket,
// the second batch batch2 tries to reuse the same bucket
// and append instead of UPDATE.
if (this.writeHandle instanceof HoodieCreateHandle) {
// During one checkpoint interval, an insert record could also be updated,
// for example, for an operation sequence of a record:
// I, U, | U, U
// - batch1 - | - batch2 -
// the first batch(batch1) operation triggers an INSERT bucket,
// the second batch batch2 tries to reuse the same bucket
// and append instead of UPDATE.
return handleInsert(fileIdHint, recordItr);
} else if (this.writeHandle instanceof HoodieMergeHandle) {
return handleUpdate(partitionPath, fileIdHint, recordItr);
} else {
switch (bucketType) {
case INSERT:
return handleInsert(fileIdHint, recordItr);
}
return handleUpdate(partitionPath, fileIdHint, recordItr);
default:
throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);
default:
throw new AssertionError();
}
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
@@ -212,7 +209,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr);
HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;
return handleUpdateInternal(upsertHandle, fileId);
}
@@ -234,19 +231,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected FlinkMergeHandle getUpdateHandle(Iterator<HoodieRecord<T>> recordItr) {
if (table.requireSortedRecords()) {
throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write");
} else {
FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle;
// add the incremental records.
if (writeHandle.shouldRollover()) {
writeHandle.rollOver(recordItr);
}
return writeHandle;
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -62,17 +61,14 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
@Override
public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle) throws IOException {
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (isFirstTimeMerge(mergeHandle)
&& (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
@@ -83,10 +79,11 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
}
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) {
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
@@ -114,9 +111,4 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
}
}
}
private static boolean isFirstTimeMerge(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) {
return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover();
}
}

View File

@@ -49,9 +49,6 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordP
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
if (appendHandle.shouldRollover()) {
appendHandle.appendNewRecords(recordItr);
}
appendHandle.doAppend();
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();

View File

@@ -142,22 +142,6 @@ public class StreamWriteFunction<K, I, O>
*/
private transient TotalSizeTracer tracer;
/**
* Flag saying whether the write task is waiting for the checkpoint success notification
* after it finished a checkpoint.
*
* <p>The flag is needed because the write task does not block during the waiting time interval,
* some data buckets still flush out with old instant time. There are two cases that the flush may produce
* corrupted files if the old instant is committed successfully:
* 1) the write handle was writing data but interrupted, left a corrupted parquet file;
* 2) the write handle finished the write but was not closed, left an empty parquet file.
*
* <p>To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms,
* the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because
* it always filters the data files based on successful commit time.
*/
private volatile boolean confirming = false;
/**
* Constructs a StreamingSinkFunction.
*
@@ -207,7 +191,7 @@ public class StreamWriteFunction<K, I, O>
@Override
public void notifyCheckpointComplete(long checkpointId) {
this.confirming = false;
this.writeClient.cleanHandles();
}
/**
@@ -416,16 +400,12 @@ public class StreamWriteFunction<K, I, O>
return;
}
// if we are waiting for the checkpoint notification, shift the write instant time.
boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant);
final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant;
List<HoodieRecord> records = bucket.records;
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant));
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
@@ -473,7 +453,5 @@ public class StreamWriteFunction<K, I, O>
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
this.writeClient.cleanHandles();
this.confirming = true;
}
}

View File

@@ -500,9 +500,8 @@ public class TestWriteCopyOnWrite {
Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1, "
// the last 3 lines are merged
expected.put("par1", "["
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1]");
return expected;

View File

@@ -163,6 +163,7 @@ public class StreamWriteFunctionWrapper<I> {
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
this.writeFunction.notifyCheckpointComplete(checkpointId);
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
try {
compactFunctionWrapper.compact(checkpointId);