diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 8579f54b0..d0778f75d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -132,13 +132,6 @@ public class HoodieMergeHandle extends H return writerSchema; } - /** - * Returns the data file name. - */ - protected String generatesDataFileName() { - return FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); - } - /** * Extract old file path, initialize StorageWriter and WriteStatus. */ @@ -155,11 +148,8 @@ public class HoodieMergeHandle extends H new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath)); partitionMetadata.trySave(getPartitionId()); - oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); - String newFileName = generatesDataFileName(); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") - + newFileName).toString(); - newFilePath = new Path(config.getBasePath(), relativePath); + String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName); LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), newFilePath.toString())); @@ -183,6 +173,11 @@ public class HoodieMergeHandle extends H } } + protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { + oldFilePath = makeNewFilePath(partitionPath, oldFileName); + newFilePath = makeNewFilePath(partitionPath, newFileName); + } + /** * Initialize a spillable map for incoming records. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index df0c2b6e2..c3608dc40 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -119,6 +119,15 @@ public abstract class HoodieWriteHandle hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension())); } + /** + * Make new file path with given file name. + */ + protected Path makeNewFilePath(String partitionPath, String fileName) { + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + fileName).toString(); + return new Path(config.getBasePath(), relativePath); + } + /** * Creates an empty marker file corresponding to storage writer path. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 556d98e35..876cce132 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -20,6 +20,7 @@ package org.apache.hudi.io; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; @@ -27,9 +28,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -66,27 +67,70 @@ public class FlinkCreateHandle TaskContextSupplier taskContextSupplier) { super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair, taskContextSupplier); + // delete invalid data files generated by task retry. + if (getAttemptId() > 0) { + deleteInvalidDataFile(getAttemptId() - 1); + } + } + + /** + * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) + * (thus the fs view got the written data files some of which may be invalid), + * it goes on with the next round checkpoint(B) write immediately, + * if it tries to reuse the last small data bucket(small file) of an invalid data file, + * finally, when the coordinator receives the checkpoint success event of checkpoint(A), + * the invalid data file would be cleaned, + * and this merger got a FileNotFoundException when it close the write file handle. + * + *

To solve, deletes the invalid data file eagerly + * so that the invalid file small bucket would never be reused. + * + * @param lastAttemptId The last attempt ID + */ + private void deleteInvalidDataFile(long lastAttemptId) { + final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId); + final String lastDataFileName = FSUtils.makeDataFileName(instantTime, + lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension()); + final Path path = makeNewFilePath(partitionPath, lastDataFileName); + try { + if (fs.exists(path)) { + LOG.info("Deleting invalid INSERT file due to task retry: " + lastDataFileName); + fs.delete(path, false); + } + } catch (IOException e) { + throw new HoodieException("Error while deleting the INSERT file due to task retry: " + lastDataFileName, e); + } } @Override - protected void createMarkerFile(String partitionPath, String dataFileName) { - MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); - boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); - if (!created) { - // If the marker file already exists, that means the write task - // was pulled up again with same data file name, removes the legacy - // data file first. - try { - if (fs.exists(path)) { - fs.delete(path, false); - LOG.warn("Legacy data file: " + path + " delete success"); - } - } catch (IOException e) { - throw new HoodieException("Error while deleting legacy data file: " + path, e); + public Path makeNewPath(String partitionPath) { + Path path = super.makeNewPath(partitionPath); + // If the data file already exists, it means the write task write new data bucket multiple times + // in one hoodie commit, rolls over to a new name instead. + + // Write to a new file which behaves like a different task write. + try { + int rollNumber = 0; + while (fs.exists(path)) { + Path existing = path; + path = newFilePathWithRollover(rollNumber++); + LOG.warn("Duplicate write for INSERT bucket with path: " + existing + ", rolls over to new path: " + path); } + return path; + } catch (IOException e) { + throw new HoodieException("Checking existing path for create handle error: " + path, e); } } + /** + * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. + */ + private Path newFilePathWithRollover(int rollNumber) { + final String dataFileName = FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, fileId, + hoodieTable.getBaseFileExtension()); + return makeNewFilePath(partitionPath, dataFileName); + } + /** * Get the incremental write status. In mini-batch write mode, * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches), @@ -111,7 +155,7 @@ public class FlinkCreateHandle } @Override - protected long computeTotalWriteBytes() throws IOException { + protected long computeTotalWriteBytes() { long fileSizeInBytes = computeFileSizeInBytes(); long incFileSizeInBytes = fileSizeInBytes - lastFileSize; this.lastFileSize = fileSizeInBytes; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 7fffd6baf..023ee5f74 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; @@ -78,18 +79,76 @@ public class FlinkMergeHandle TaskContextSupplier taskContextSupplier) { super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); if (rolloverPaths == null) { - // #createMarkerFile may already initialize it already + // #makeOldAndNewFilePaths may already initialize it already rolloverPaths = new ArrayList<>(); } + // delete invalid data files generated by task retry. + if (getAttemptId() > 0) { + deleteInvalidDataFile(getAttemptId() - 1); + } } /** - * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) + * (thus the fs view got the written data files some of which may be invalid), + * it goes on with the next round checkpoint(B) write immediately, + * if it tries to reuse the last small data bucket(small file) of an invalid data file, + * finally, when the coordinator receives the checkpoint success event of checkpoint(A), + * the invalid data file would be cleaned, + * and this merger got a FileNotFoundException when it close the write file handle. + * + *

To solve, deletes the invalid data file eagerly + * so that the invalid file small bucket would never be reused. + * + * @param lastAttemptId The last attempt ID */ - protected String generatesDataFileNameWithRollover() { + private void deleteInvalidDataFile(long lastAttemptId) { + final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId); + final String lastDataFileName = FSUtils.makeDataFileName(instantTime, + lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension()); + final Path path = makeNewFilePath(partitionPath, lastDataFileName); + try { + if (fs.exists(path)) { + LOG.info("Deleting invalid MERGE base file due to task retry: " + lastDataFileName); + fs.delete(path, false); + } + } catch (IOException e) { + throw new HoodieException("Error while deleting the MERGE base file due to task retry: " + lastDataFileName, e); + } + } + + @Override + protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { + // If the data file already exists, it means the write task write merge data bucket multiple times + // in one hoodie commit, rolls over to a new name instead. + + // Use the existing file path as the base file path (file1), + // and generates new file path with roll over number (file2). + // the incremental data set would merge into the file2 instead of file1. + // + // When the task finalizes in #finishWrite, the intermediate files would be cleaned. + super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName); + rolloverPaths = new ArrayList<>(); + try { + while (fs.exists(newFilePath)) { + oldFilePath = newFilePath; // override the old file name + rolloverPaths.add(oldFilePath); + newFileName = newFileNameWithRollover(rollNumber++); + newFilePath = makeNewFilePath(partitionPath, newFileName); + LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); + } + } catch (IOException e) { + throw new HoodieException("Checking existing path for merge handle error: " + newFilePath, e); + } + } + + /** + * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. + */ + protected String newFileNameWithRollover(int rollNumber) { // make the intermediate file as hidden - return FSUtils.makeDataFileName("." + instantTime, - writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension()); + return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, + this.fileId, hoodieTable.getBaseFileExtension()); } public boolean shouldRollover() { @@ -109,25 +168,6 @@ public class FlinkMergeHandle return false; } - @Override - protected void createMarkerFile(String partitionPath, String dataFileName) { - MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); - boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); - if (!created) { - // If the marker file already exists, that means the write task - // was pulled up again with same data file name, performs rolling over action here: - // use the new file path as the base file path (file1), - // and generates new file path with roll over number (file2). - // the incremental data set would merge into the file2 instead of file1. - // - // When the task do finalization in #finishWrite, the intermediate files would be cleaned. - oldFilePath = newFilePath; - rolloverPaths = new ArrayList<>(); - rolloverPaths.add(oldFilePath); - newFilePath = makeNewFilePathWithRollover(); - } - } - /** * * Rolls over the write handle to prepare for the next batch write. @@ -156,7 +196,13 @@ public class FlinkMergeHandle rolloverPaths.add(newFilePath); oldFilePath = newFilePath; - newFilePath = makeNewFilePathWithRollover(); + final String newFileName = newFileNameWithRollover(rollNumber); + newFilePath = makeNewFilePath(partitionPath, newFileName); + + // create the marker file so that the intermediate roll over files + // of the retry task can be cleaned. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, newFileName, getIOType()); try { fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); @@ -168,16 +214,6 @@ public class FlinkMergeHandle newFilePath.toString())); } - /** - * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. - */ - private Path makeNewFilePathWithRollover() { - String newFileName = generatesDataFileNameWithRollover(); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") - + newFileName).toString(); - return new Path(config.getBasePath(), relativePath); - } - public void finishWrite() { // The file visibility should be kept by the configured ConsistencyGuard instance. rolloverPaths.add(newFilePath); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index d8bdb9fba..cb8303e5f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -31,10 +31,12 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; @@ -318,6 +320,12 @@ public class HoodieFlinkCopyOnWriteTable extends throw new HoodieNotSupportedException("Savepoint and restore is not supported yet"); } + @Override + public void finalizeWrite(HoodieEngineContext context, String instantTs, List stats) throws HoodieIOException { + // do nothing because flink create and merge handles can clean the + // retry files by themselves. + } + // ------------------------------------------------------------------------- // Used for compaction // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 7752615ac..4bed143de 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -267,18 +267,19 @@ public class FlinkOptions { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); + public static final ConfigOption WRITE_TASK_MAX_SIZE = ConfigOptions + .key("write.task.max.size") + .doubleType() + .defaultValue(1024D) // 1GB + .withDescription("Maximum memory in MB for a write task, when the threshold hits,\n" + + "it flushes the max size data bucket to avoid OOM, default 1GB"); + public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size") .doubleType() .defaultValue(64D) // 64MB .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); - public static final ConfigOption WRITE_RATE_LIMIT = ConfigOptions - .key("write.rate.limit") - .longType() - .defaultValue(-1L) // default no limit - .withDescription("Write records rate limit per second to reduce risk of OOM, default -1 (no limit)"); - public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions .key("write.log_block.size") .intType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 5c3cbb7b0..9db472f52 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -33,6 +33,7 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Sink function to write the data to the underneath filesystem. @@ -59,7 +61,8 @@ import java.util.function.BiFunction; *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * It flushes(write) the records batch when the batch size exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * or the total buffer size exceeds the configured size {@link FlinkOptions#WRITE_TASK_MAX_SIZE} * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * @@ -91,7 +94,7 @@ import java.util.function.BiFunction; */ public class StreamWriteFunction extends KeyedProcessFunction - implements CheckpointedFunction { + implements CheckpointedFunction, CheckpointListener { private static final long serialVersionUID = 1L; @@ -134,6 +137,11 @@ public class StreamWriteFunction */ private transient String actionType; + /** + * Total size tracer. + */ + private transient TotalSizeTracer tracer; + /** * Constructs a StreamingSinkFunction. * @@ -150,6 +158,7 @@ public class StreamWriteFunction this.actionType = CommitUtils.getCommitActionType( WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); + this.tracer = new TotalSizeTracer(this.config); initBuffer(); initWriteFunction(); } @@ -168,7 +177,7 @@ public class StreamWriteFunction } @Override - public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) { bufferRecord(value); } @@ -180,6 +189,11 @@ public class StreamWriteFunction } } + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.writeClient.cleanHandles(); + } + /** * End input action for batch source. */ @@ -294,6 +308,44 @@ public class StreamWriteFunction } } + /** + * Tool to trace the total buffer size. It computes the maximum buffer size, + * if current buffer size is greater than the maximum buffer size, the data bucket + * flush triggers. + */ + private static class TotalSizeTracer { + private long bufferSize = 0L; + private final double maxBufferSize; + + TotalSizeTracer(Configuration conf) { + long mergeReaderMem = 100; // constant 100MB + long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY); + this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024; + final String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", + FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key()); + ValidationUtils.checkState(this.maxBufferSize > 0, errMsg); + } + + /** + * Trace the given record size {@code recordSize}. + * + * @param recordSize The record size + * @return true if the buffer size exceeds the maximum buffer size + */ + boolean trace(long recordSize) { + this.bufferSize += recordSize; + return this.bufferSize > this.maxBufferSize; + } + + void countDown(long size) { + this.bufferSize -= size; + } + + public void reset() { + this.bufferSize = 0; + } + } + /** * Returns the bucket ID with the given value {@code value}. */ @@ -309,6 +361,9 @@ public class StreamWriteFunction *

Flush the data bucket first if the bucket records size is greater than * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. * + *

Flush the max size data bucket if the total buffer size exceeds the configured + * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}. + * * @param value HoodieRecord */ private void bufferRecord(I value) { @@ -316,10 +371,21 @@ public class StreamWriteFunction DataBucket bucket = this.buckets.computeIfAbsent(bucketID, k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE))); - boolean needFlush = bucket.detector.detect(value); - if (needFlush) { + boolean flushBucket = bucket.detector.detect(value); + boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); + if (flushBucket) { flushBucket(bucket); + this.tracer.countDown(bucket.detector.totalSize); bucket.reset(); + } else if (flushBuffer) { + // find the max size bucket and flush it out + List sortedBuckets = this.buckets.values().stream() + .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize)) + .collect(Collectors.toList()); + final DataBucket bucketToFlush = sortedBuckets.get(0); + flushBucket(bucketToFlush); + this.tracer.countDown(bucketToFlush.detector.totalSize); + bucketToFlush.reset(); } bucket.records.add((HoodieRecord) value); } @@ -384,7 +450,7 @@ public class StreamWriteFunction .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); - this.writeClient.cleanHandles(); + this.tracer.reset(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 4948462f1..5bd3c687e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -44,8 +44,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Constructor; -import java.util.Random; -import java.util.concurrent.TimeUnit; /** * Function that transforms RowData to HoodieRecord. @@ -82,12 +80,6 @@ public class RowDataToHoodieFunction 0) { - this.rateLimiter = new RateLimiter(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks()); - } } @SuppressWarnings("unchecked") @Override public O map(I i) throws Exception { - if (rateLimiter != null) { - final O hoodieRecord; - if (rateLimiter.sampling()) { - long startTime = System.currentTimeMillis(); - hoodieRecord = (O) toHoodieRecord(i); - long endTime = System.currentTimeMillis(); - rateLimiter.processTime(endTime - startTime); - } else { - hoodieRecord = (O) toHoodieRecord(i); - } - rateLimiter.sleepIfNeeded(); - return hoodieRecord; - } else { - return (O) toHoodieRecord(i); - } + return (O) toHoodieRecord(i); } /** @@ -191,43 +165,4 @@ public class RowDataToHoodieFunction 0, "rate should be positive"); - this.maxProcessTime = 1000 / rate; - } - - void processTime(long processTime) { - this.processTime = processTime; - this.timeToSleep = maxProcessTime - processTime; - } - - boolean sampling() { - // 0.01 sampling percentage - return processTime == -1 || random.nextInt(DENOMINATOR) == 1; - } - - void sleepIfNeeded() throws Exception { - if (timeToSleep > 0) { - TimeUnit.MILLISECONDS.sleep(timeToSleep); - } - } - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 9e417e33f..7da980135 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -436,6 +436,68 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } + @Test + public void testInsertWithSmallBufferSize() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.001); // 1Kb buffer size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // each record is 424 bytes. so 3 records expect to trigger buffer flush: + // flush the max size bucket once at a time. + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("2 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(3)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + for (int i = 0; i < 2; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map expected = getMiniBatchExpected(); + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + for (int i = 0; i < 2; i++) { + final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first + funcWrapper.getCoordinator().handleEventFromOperator(0, event); + } + + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + Map getMiniBatchExpected() { Map expected = new HashMap<>(); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java b/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java deleted file mode 100644 index 631b53345..000000000 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/transform/TestRowDataToHoodieFunction.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.transform; - -import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.utils.MockStreamingRuntimeContext; -import org.apache.hudi.utils.TestConfigurations; -import org.apache.hudi.utils.TestData; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.data.RowData; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Test cases for {@link RowDataToHoodieFunction}. - */ -public class TestRowDataToHoodieFunction { - @TempDir - File tempFile; - - private Configuration conf; - - @BeforeEach - public void before() { - final String basePath = tempFile.getAbsolutePath(); - conf = TestConfigurations.getDefaultConf(basePath); - } - - @Test - void testRateLimit() throws Exception { - // at most 100 record per second - RowDataToHoodieFunction func1 = getFunc(100); - long instant1 = System.currentTimeMillis(); - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - func1.map(rowData); - } - long instant2 = System.currentTimeMillis(); - long processTime1 = instant2 - instant1; - - // at most 1 record per second - RowDataToHoodieFunction func2 = getFunc(1); - long instant3 = System.currentTimeMillis(); - for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) { - func2.map(rowData); - } - long instant4 = System.currentTimeMillis(); - long processTime2 = instant4 - instant3; - - assertTrue(processTime2 > processTime1, "lower rate should have longer process time"); - assertTrue(processTime2 > 5000, "should process at least 5 seconds"); - } - - private RowDataToHoodieFunction getFunc(long rate) throws Exception { - conf.setLong(FlinkOptions.WRITE_RATE_LIMIT, rate); - RowDataToHoodieFunction func = - new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); - func.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1)); - func.open(conf); - return func; - } -} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 5050109a8..a4b6c16a3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -163,6 +163,7 @@ public class StreamWriteFunctionWrapper { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); + this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { try { compactFunctionWrapper.compact(checkpointId);