From 9c369c607df2816ea2cd1221fb6d879e3fb8f74c Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 6 Apr 2021 19:06:41 +0800 Subject: [PATCH] [HUDI-1757] Assigns the buckets by record key for Flink writer (#2757) Currently we assign the buckets by record partition path which could cause hotspot if the partition field is datetime type. Changes to assign buckets by grouping the record whth their key first, the assignment is valid if only there is no conflict(two task write to the same bucket). This patch also changes the coordinator execution to be asynchronous. --- .../apache/hudi/index/HoodieIndexUtils.java | 1 - .../hudi/client/HoodieFlinkWriteClient.java | 12 +- .../hudi/common/util/ReflectionUtils.java | 2 +- .../hudi/configuration/FlinkOptions.java | 8 +- .../org/apache/hudi/sink/CleanFunction.java | 8 +- .../apache/hudi/sink/StreamWriteFunction.java | 48 +--- .../sink/StreamWriteOperatorCoordinator.java | 256 ++++++++---------- .../hudi/sink/compact/CompactFunction.java | 42 ++- .../sink/compact/CompactionCommitSink.java | 52 ++-- .../partitioner/BucketAssignFunction.java | 90 +----- .../hudi/sink/partitioner/BucketAssigner.java | 25 +- .../sink/partitioner/BucketAssigners.java | 8 +- .../delta/DeltaBucketAssigner.java | 20 +- .../transform/RowDataToHoodieFunction.java | 75 ++++- .../hudi/sink/utils/CoordinatorExecutor.java | 57 ++++ .../hudi/sink/utils/NonThrownExecutor.java | 22 +- .../apache/hudi/table/HoodieTableSink.java | 4 +- .../org/apache/hudi/util/StreamerUtil.java | 20 +- .../apache/hudi/sink/StreamWriteITCase.java | 12 +- .../TestStreamWriteOperatorCoordinator.java | 68 +++-- .../hudi/sink/TestWriteCopyOnWrite.java | 23 +- .../sink/partitioner/TestBucketAssigner.java | 114 +++++++- .../sink/utils/CompactFunctionWrapper.java | 5 + .../sink/utils/MockCoordinatorExecutor.java | 51 ++++ .../utils/StreamWriteFunctionWrapper.java | 15 +- 25 files changed, 638 insertions(+), 400 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 49c81b790..e5426ca11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -41,7 +41,6 @@ public class HoodieIndexUtils { * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. * * @param partition Partition of interest - * @param context Instance of {@link HoodieEngineContext} to use * @param hoodieTable Instance of {@link HoodieTable} of interest * @return the list of {@link HoodieBaseFile} */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 6a6bcedbf..7eeec4c70 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -260,11 +260,17 @@ public class HoodieFlinkWriteClient extends * but cleaning action should trigger after all the write actions within a * checkpoint finish. * - * @param instantTime The latest successful commit time + * @param table Table to commit on + * @param metadata Commit Metadata corresponding to committed instant + * @param instantTime Instant Time + * @param extraMetadata Additional Metadata passed by user */ - public void postCommit(String instantTime) { + @Override + protected void postCommit(HoodieTable>, List, List> table, + HoodieCommitMetadata metadata, + String instantTime, + Option> extraMetadata) { try { - HoodieTable table = createTable(config, hadoopConf); // Delete the marker directory for the instant. new MarkerFiles(createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 23a87e770..bb30b2a1f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -47,7 +47,7 @@ public class ReflectionUtils { private static Map> clazzCache = new HashMap<>(); - private static Class getClass(String clazzName) { + public static Class getClass(String clazzName) { if (!clazzCache.containsKey(clazzName)) { try { Class clazz = Class.forName(clazzName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8691d91b5..b66be2b1e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -255,7 +255,7 @@ public class FlinkOptions { public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size.MB") .doubleType() - .defaultValue(128D) // 128MB + .defaultValue(2D) // 2MB .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); // ------------------------------------------------------------------------ @@ -294,6 +294,12 @@ public class FlinkOptions { .defaultValue(3600) // default 1 hour .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour"); + public static final ConfigOption COMPACTION_MAX_MEMORY = ConfigOptions + .key("compaction.max_memory") + .intType() + .defaultValue(100) // default 100 MB + .withDescription("Max memory in MB for compaction spillable map, default 100MB"); + public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 7b875ff8e..14dd827a5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Sink function that cleans the old commits. @@ -40,6 +42,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; */ public class CleanFunction extends AbstractRichFunction implements SinkFunction, CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(CleanFunction.class); + private final Configuration conf; private HoodieFlinkWriteClient writeClient; @@ -56,7 +60,7 @@ public class CleanFunction extends AbstractRichFunction super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.executor = new NonThrownExecutor(); + this.executor = new NonThrownExecutor(LOG); } } @@ -70,7 +74,7 @@ public class CleanFunction extends AbstractRichFunction // ensure to switch the isCleaning flag this.isCleaning = false; } - }, "wait for cleaning finish", ""); + }, "wait for cleaning finish"); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 364d28ebb..b0321acda 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -31,9 +31,9 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -50,8 +50,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; /** @@ -103,21 +101,6 @@ public class StreamWriteFunction */ private transient Map buckets; - /** - * The buffer lock to control data buffering/flushing. - */ - private transient ReentrantLock bufferLock; - - /** - * The condition to decide whether to add new records into the buffer. - */ - private transient Condition addToBufferCondition; - - /** - * Flag saying whether there is an on-going checkpoint. - */ - private volatile boolean onCheckpointing = false; - /** * Config options. */ @@ -169,32 +152,15 @@ public class StreamWriteFunction @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { - bufferLock.lock(); - try { - // Based on the fact that the coordinator starts the checkpoint first, - // it would check the validity. - this.onCheckpointing = true; - // wait for the buffer data flush out and request a new instant - flushRemaining(false); - // signal the task thread to start buffering - addToBufferCondition.signal(); - } finally { - this.onCheckpointing = false; - bufferLock.unlock(); - } + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushRemaining(false); } @Override public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - bufferLock.lock(); - try { - if (onCheckpointing) { - addToBufferCondition.await(); - } - bufferRecord(value); - } finally { - bufferLock.unlock(); - } + bufferRecord(value); } @Override @@ -247,8 +213,6 @@ public class StreamWriteFunction private void initBuffer() { this.buckets = new LinkedHashMap<>(); - this.bufferLock = new ReentrantLock(); - this.addToBufferCondition = this.bufferLock.newCondition(); } private void initWriteFunction() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 51149e2dc..54a7603a5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -26,27 +26,20 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.util.Preconditions; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -64,7 +57,7 @@ import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; *

This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the * operator tasks write the buffer successfully for a round of checkpoint. * - *

If there is no data for a round of checkpointing, it rolls back the metadata. + *

If there is no data for a round of checkpointing, it resets the events buffer and returns early. * * @see StreamWriteFunction for the work flow and semantics */ @@ -77,20 +70,20 @@ public class StreamWriteOperatorCoordinator */ private final Configuration conf; + /** + * Coordinator context. + */ + private final Context context; + /** * Write client. */ private transient HoodieFlinkWriteClient writeClient; - /** - * Current data buffering checkpoint. - */ - private long inFlightCheckpoint = -1; - /** * Current REQUESTED instant, for validation. */ - private String instant = ""; + private volatile String instant = ""; /** * Event buffer for one round of checkpointing. When all the elements are non-null and have the same @@ -111,7 +104,12 @@ public class StreamWriteOperatorCoordinator /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ - private NonThrownExecutor executor; + private CoordinatorExecutor executor; + + /** + * A single-thread executor to handle asynchronous hive sync. + */ + private NonThrownExecutor hiveSyncExecutor; /** * Context that holds variables for asynchronous hive sync. @@ -121,14 +119,15 @@ public class StreamWriteOperatorCoordinator /** * Constructs a StreamingSinkOperatorCoordinator. * - * @param conf The config options - * @param parallelism The operator task number + * @param conf The config options + * @param context The coordinator context */ public StreamWriteOperatorCoordinator( Configuration conf, - int parallelism) { + Context context) { this.conf = conf; - this.parallelism = parallelism; + this.context = context; + this.parallelism = context.currentParallelism(); this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); } @@ -142,6 +141,8 @@ public class StreamWriteOperatorCoordinator initTableIfNotExists(this.conf); // start a new instant startInstant(); + // start the executor + this.executor = new CoordinatorExecutor(this.context, LOG); // start the executor if required if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { initHiveSync(); @@ -162,38 +163,46 @@ public class StreamWriteOperatorCoordinator @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { - try { - this.inFlightCheckpoint = checkpointId; - result.complete(writeCheckpointBytes()); - } catch (Throwable throwable) { - // when a checkpoint fails, throws directly. - result.completeExceptionally( - new CompletionException( - String.format("Failed to checkpoint Instant %s for source %s", - this.instant, this.getClass().getSimpleName()), throwable)); - } + executor.execute( + () -> { + try { + result.complete(new byte[0]); + } catch (Throwable throwable) { + // when a checkpoint fails, throws directly. + result.completeExceptionally( + new CompletionException( + String.format("Failed to checkpoint Instant %s for source %s", + this.instant, this.getClass().getSimpleName()), throwable)); + } + }, "taking checkpoint %d", checkpointId + ); } @Override public void notifyCheckpointComplete(long checkpointId) { - // start to commit the instant. - final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n" - + "but the coordinator has not received full write success events,\n" - + "rolls back the instant and rethrow", this.instant, checkpointId); - checkAndForceCommit(errorMsg); - // if async compaction is on, schedule the compaction - if (needsScheduleCompaction) { - writeClient.scheduleCompaction(Option.empty()); - } + executor.execute( + () -> { + // for streaming mode, commits the ever received events anyway, + // the stream write task snapshot and flush the data buffer synchronously in sequence, + // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) + final boolean committed = commitInstant(); + if (committed) { + // if async compaction is on, schedule the compaction + if (needsScheduleCompaction) { + writeClient.scheduleCompaction(Option.empty()); + } + // start new instant. + startInstant(); + } + }, "commits the instant %s", this.instant + ); // sync Hive if is enabled syncHiveIfEnabled(); - // start new instant. - startInstant(); } private void syncHiveIfEnabled() { if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { - this.executor.execute(this::syncHive, "sync hive metadata", this.instant); + this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); } } @@ -211,43 +220,38 @@ public class StreamWriteOperatorCoordinator this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } - public void notifyCheckpointAborted(long checkpointId) { - Preconditions.checkState(inFlightCheckpoint == checkpointId, - "The aborted checkpoint should always be the last checkpoint"); - checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw"); - } - @Override public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception { - if (checkpointData != null) { - // restore when any checkpoint completed - deserializeCheckpointAndRestore(checkpointData); - } + // no operation } @Override public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { - // no event to handle - ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, - "The coordinator can only handle BatchWriteSuccessEvent"); - BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - // the write task does not block after checkpointing(and before it receives a checkpoint success event), - // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint - // success event, the data buffer would flush with an older instant time. - ValidationUtils.checkState( - HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), - String.format("Receive an unexpected event for instant %s from task %d", - event.getInstantTime(), event.getTaskID())); - if (this.eventBuffer[event.getTaskID()] != null) { - this.eventBuffer[event.getTaskID()].mergeWith(event); - } else { - this.eventBuffer[event.getTaskID()] = event; - } - if (event.isEndInput() && checkReady()) { - // start to commit the instant. - doCommit(); - // no compaction scheduling for batch mode - } + executor.execute( + () -> { + // no event to handle + ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, + "The coordinator can only handle BatchWriteSuccessEvent"); + BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; + // the write task does not block after checkpointing(and before it receives a checkpoint success event), + // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // success event, the data buffer would flush with an older instant time. + ValidationUtils.checkState( + HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), + String.format("Receive an unexpected event for instant %s from task %d", + event.getInstantTime(), event.getTaskID())); + if (this.eventBuffer[event.getTaskID()] != null) { + this.eventBuffer[event.getTaskID()].mergeWith(event); + } else { + this.eventBuffer[event.getTaskID()] = event; + } + if (event.isEndInput() && allEventsReceived()) { + // start to commit the instant. + commitInstant(); + // no compaction scheduling for batch mode + } + }, "handle write success event for instant %s", this.instant + ); } @Override @@ -265,85 +269,31 @@ public class StreamWriteOperatorCoordinator // ------------------------------------------------------------------------- private void initHiveSync() { - this.executor = new NonThrownExecutor(); + this.hiveSyncExecutor = new NonThrownExecutor(LOG); this.hiveSyncContext = HiveSyncContext.create(conf); } - static byte[] readBytes(DataInputStream in, int size) throws IOException { - byte[] bytes = new byte[size]; - in.readFully(bytes); - return bytes; - } - - /** - * Serialize the coordinator state. The current implementation may not be super efficient, - * but it should not matter that much because most of the state should be rather small. - * Large states themselves may already be a problem regardless of how the serialization - * is implemented. - * - * @return A byte array containing the serialized state of the source coordinator. - * @throws IOException When something goes wrong in serialization. - */ - private byte[] writeCheckpointBytes() throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { - - out.writeLong(this.inFlightCheckpoint); - byte[] serializedInstant = this.instant.getBytes(); - out.writeInt(serializedInstant.length); - out.write(serializedInstant); - out.flush(); - return baos.toByteArray(); - } - } - - /** - * Restore the state of this source coordinator from the state bytes. - * - * @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()} - * @throws Exception When the deserialization failed. - */ - private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception { - try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - DataInputStream in = new DataInputViewStreamWrapper(bais)) { - long checkpointID = in.readLong(); - int serializedInstantSize = in.readInt(); - byte[] serializedInstant = readBytes(in, serializedInstantSize); - this.inFlightCheckpoint = checkpointID; - this.instant = new String(serializedInstant); - } - } - private void reset() { - this.instant = ""; this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; } - private void checkAndForceCommit(String errMsg) { - if (!checkReady()) { - // forced but still has inflight instant - String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE)); - if (inflightInstant != null) { - assert inflightInstant.equals(this.instant); - writeClient.rollback(this.instant); - throw new HoodieException(errMsg); - } - if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { - // The last checkpoint finished successfully. - return; - } - } - doCommit(); - } - /** Checks the buffer is ready to commit. */ - private boolean checkReady() { + private boolean allEventsReceived() { return Arrays.stream(eventBuffer) .allMatch(event -> event != null && event.isReady(this.instant)); } - /** Performs the actual commit action. */ - private void doCommit() { + /** + * Commits the instant. + * + * @return true if the write statuses are committed successfully. + */ + private boolean commitInstant() { + if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { + // The last checkpoint finished successfully. + return false; + } + List writeResults = Arrays.stream(eventBuffer) .filter(Objects::nonNull) .map(BatchWriteSuccessEvent::getWriteStatuses) @@ -351,12 +301,16 @@ public class StreamWriteOperatorCoordinator .collect(Collectors.toList()); if (writeResults.size() == 0) { - // No data has written, clear the metadata file - this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant); + // No data has written, reset the buffer and returns early reset(); - return; + return false; } + doCommit(writeResults); + return true; + } + /** Performs the actual commit action. */ + private void doCommit(List writeResults) { // commit or rollback long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); @@ -371,7 +325,6 @@ public class StreamWriteOperatorCoordinator boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); if (success) { - writeClient.postCommit(this.instant); reset(); LOG.info("Commit instant [{}] success!", this.instant); } else { @@ -409,6 +362,19 @@ public class StreamWriteOperatorCoordinator return writeClient; } + @VisibleForTesting + public Context getContext() { + return context; + } + + @VisibleForTesting + public void setExecutor(CoordinatorExecutor executor) throws Exception { + if (this.executor != null) { + this.executor.close(); + } + this.executor = executor; + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -432,7 +398,7 @@ public class StreamWriteOperatorCoordinator @Override public OperatorCoordinator create(Context context) { - return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism()); + return new StreamWriteOperatorCoordinator(this.conf, context); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 7f4f7b97b..ee8678b44 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -21,13 +21,17 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; @@ -36,6 +40,7 @@ import java.util.List; * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. */ public class CompactFunction extends KeyedProcessFunction { + private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class); /** * Config options. @@ -52,6 +57,11 @@ public class CompactFunction extends KeyedProcessFunction collector) throws Exception { final String instantTime = event.getCompactionInstantTime(); final CompactionOperation compactionOperation = event.getOperation(); + // executes the compaction task asynchronously to not block the checkpoint barrier propagate. + executor.execute( + () -> { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + this.writeClient.getConfig(), + this.writeClient.getEngineContext(), + this.writeClient.getHoodieTable().getMetaClient()), + this.writeClient.getHoodieTable().getMetaClient(), + this.writeClient.getConfig(), + compactionOperation, + instantTime); + collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + }, "Execute compaction for instant %s from task %d", instantTime, taskID + ); + } - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + @VisibleForTesting + public void setExecutor(NonThrownExecutor executor) { + this.executor = executor; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 86dae20d2..41831cd46 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -23,8 +23,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.sink.CleanFunction; @@ -37,8 +35,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.stream.Collectors; /** @@ -67,13 +66,9 @@ public class CompactionCommitSink extends CleanFunction { /** * Buffer to collect the event from each compact task {@code CompactFunction}. + * The key is the instant time. */ - private transient List commitBuffer; - - /** - * Current on-going compaction instant time. - */ - private String compactionInstantTime; + private transient Map> commitBuffer; public CompactionCommitSink(Configuration conf) { super(conf); @@ -84,36 +79,32 @@ public class CompactionCommitSink extends CleanFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.commitBuffer = new ArrayList<>(); + this.commitBuffer = new HashMap<>(); } @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { - if (compactionInstantTime == null) { - compactionInstantTime = event.getInstant(); - } else if (!event.getInstant().equals(compactionInstantTime)) { - // last compaction still not finish, rolls it back - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime); - writeClient.rollbackInflightCompaction(inflightInstant); - this.compactionInstantTime = event.getInstant(); - } - this.commitBuffer.add(event); - commitIfNecessary(); + final String instant = event.getInstant(); + commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>()) + .add(event); + commitIfNecessary(instant, commitBuffer.get(instant)); } /** * Condition to commit: the commit buffer has equal size with the compaction plan operations * and all the compact commit event {@link CompactionCommitEvent} has the same compaction instant time. + * + * @param instant Compaction commit instant time + * @param events Commit events ever received for the instant */ - private void commitIfNecessary() throws IOException { + private void commitIfNecessary(String instant, List events) throws IOException { HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - this.writeClient.getHoodieTable().getMetaClient(), compactionInstantTime); - boolean isReady = compactionPlan.getOperations().size() == commitBuffer.size() - && commitBuffer.stream().allMatch(event -> event != null && Objects.equals(event.getInstant(), compactionInstantTime)); + this.writeClient.getHoodieTable().getMetaClient(), instant); + boolean isReady = compactionPlan.getOperations().size() == events.size(); if (!isReady) { return; } - List statuses = this.commitBuffer.stream() + List statuses = events.stream() .map(CompactionCommitEvent::getWriteStatuses) .flatMap(Collection::stream) .collect(Collectors.toList()); @@ -127,16 +118,15 @@ public class CompactionCommitSink extends CleanFunction { } metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema()); this.writeClient.completeCompaction( - metadata, statuses, this.writeClient.getHoodieTable(), compactionInstantTime); + metadata, statuses, this.writeClient.getHoodieTable(), instant); } // commit the compaction - this.writeClient.commitCompaction(compactionInstantTime, statuses, Option.empty()); + this.writeClient.commitCompaction(instant, statuses, Option.empty()); // reset the status - reset(); + reset(instant); } - private void reset() { - this.commitBuffer.clear(); - this.compactionInstantTime = null; + private void reset(String instant) { + this.commitBuffer.remove(instant); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 5ea5f924e..9c23259fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -21,7 +21,6 @@ package org.apache.hudi.sink.partitioner; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -56,8 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; /** * The function to build the write profile incrementally for records within a checkpoint, @@ -106,29 +103,11 @@ public class BucketAssignFunction> private final boolean isChangingRecords; - /** - * All the partition paths when the task starts. It is used to help checking whether all the partitions - * are loaded into the state. - */ - private transient Set initialPartitionsToLoad; - /** * State to book-keep which partition is loaded into the index state {@code indexState}. */ private MapState partitionLoadState; - /** - * Whether all partitions are loaded, if it is true, - * we can only check the state for locations. - */ - private boolean allPartitionsLoaded = false; - - /** - * Flag saying whether to check that all the partitions are loaded. - * So that there is chance that flag {@code allPartitionsLoaded} becomes true. - */ - private boolean checkPartition = true; - public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( @@ -144,12 +123,11 @@ public class BucketAssignFunction> new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); - - // initialize and check the partitions load state - loadInitialPartitions(); } @Override @@ -181,15 +159,7 @@ public class BucketAssignFunction> final BucketInfo bucketInfo; final HoodieRecordLocation location; - // Checks whether all the partitions are loaded first. - if (checkPartition && !allPartitionsLoaded) { - checkPartitionsLoaded(); - checkPartition = false; - } - - if (!allPartitionsLoaded - && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition - && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { + if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) { // If the partition records are never loaded, load the records first. loadRecords(hoodieKey.getPartitionPath()); } @@ -226,9 +196,6 @@ public class BucketAssignFunction> public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. this.bucketAssigner.refreshTable(); - if (!allPartitionsLoaded) { - checkPartition = true; - } } /** @@ -241,12 +208,21 @@ public class BucketAssignFunction> HoodieTable hoodieTable = bucketAssigner.getTable(); List latestBaseFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); + final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int taskID = getRuntimeContext().getIndexOfThisSubtask(); for (HoodieBaseFile baseFile : latestBaseFiles) { List hoodieKeys = ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); hoodieKeys.forEach(hoodieKey -> { try { - this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + // Reference: org.apache.flink.streaming.api.datastream.KeyedStream, + // the input records is shuffled by record key + boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( + hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; + if (shouldLoad) { + this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + } } catch (Exception e) { throw new HoodieIOException("Error when load record keys from file: " + baseFile); } @@ -256,49 +232,9 @@ public class BucketAssignFunction> partitionLoadState.put(partitionPath, 0); } - /** - * Loads the existing partitions for this task. - */ - private void loadInitialPartitions() { - List allPartitionPaths = FSUtils.getAllPartitionPaths(this.context, - this.conf.getString(FlinkOptions.PATH), false, false, false); - final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); - final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); - final int taskID = getRuntimeContext().getIndexOfThisSubtask(); - // reference: org.apache.flink.streaming.api.datastream.KeyedStream - this.initialPartitionsToLoad = allPartitionPaths.stream() - .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID) - .collect(Collectors.toSet()); - } - - /** - * Checks whether all the partitions of the table are loaded into the state, - * set the flag {@code allPartitionsLoaded} to true if it is. - */ - private void checkPartitionsLoaded() { - for (String partition : this.initialPartitionsToLoad) { - try { - if (!this.partitionLoadState.contains(partition)) { - return; - } - } catch (Exception e) { - LOG.warn("Error when check whether all partitions are loaded, ignored", e); - throw new HoodieException(e); - } - } - this.allPartitionsLoaded = true; - } - - @VisibleForTesting - public boolean isAllPartitionsLoaded() { - return this.allPartitionsLoaded; - } - @VisibleForTesting public void clearIndexState() { - this.allPartitionsLoaded = false; this.indexState.clear(); - loadInitialPartitions(); } @VisibleForTesting diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 58bbe9c4a..d89ad8319 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -60,6 +60,16 @@ import java.util.stream.Collectors; public class BucketAssigner { private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); + /** + * Task ID. + */ + private final int taskID; + + /** + * Number of tasks. + */ + private final int numTasks; + /** * Remembers what type each bucket is for later. */ @@ -104,12 +114,16 @@ public class BucketAssigner { private final Map newFileAssignStates; public BucketAssigner( + int taskID, + int numTasks, HoodieFlinkEngineContext context, HoodieWriteConfig config) { bucketInfoMap = new HashMap<>(); partitionSmallFilesMap = new HashMap<>(); smallFileAssignStates = new HashMap<>(); newFileAssignStates = new HashMap<>(); + this.taskID = taskID; + this.numTasks = numTasks; this.context = context; this.config = config; this.table = HoodieFlinkTable.create(this.config, this.context); @@ -187,7 +201,7 @@ public class BucketAssigner { if (partitionSmallFilesMap.containsKey(partitionPath)) { return partitionSmallFilesMap.get(partitionPath); } - List smallFiles = getSmallFiles(partitionPath); + List smallFiles = smallFilesOfThisTask(getSmallFiles(partitionPath)); if (smallFiles.size() > 0) { LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); partitionSmallFilesMap.put(partitionPath, smallFiles); @@ -240,6 +254,15 @@ public class BucketAssigner { return smallFileLocations; } + private List smallFilesOfThisTask(List smallFiles) { + // computes the small files to write inserts for this task. + List smallFilesOfThisTask = new ArrayList<>(); + for (int i = taskID; i < smallFiles.size(); i += numTasks) { + smallFilesOfThisTask.add(smallFiles.get(i)); + } + return smallFilesOfThisTask; + } + /** * Obtains the average record size based on records written during previous commits. Used for estimating how many * records pack into one file. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index f5703f16f..237ec27b2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -33,20 +33,24 @@ public abstract class BucketAssigners { /** * Creates a {@code BucketAssigner}. * + * @param taskID The task ID + * @param numTasks The number of tasks * @param tableType The table type * @param context The engine context * @param config The configuration * @return the bucket assigner instance */ public static BucketAssigner create( + int taskID, + int numTasks, HoodieTableType tableType, HoodieFlinkEngineContext context, HoodieWriteConfig config) { switch (tableType) { case COPY_ON_WRITE: - return new BucketAssigner(context, config); + return new BucketAssigner(taskID, numTasks, context, config); case MERGE_ON_READ: - return new DeltaBucketAssigner(context, config); + return new DeltaBucketAssigner(taskID, numTasks, context, config); default: throw new AssertionError(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java index 895f593fa..deb8250e5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java @@ -40,8 +40,12 @@ import java.util.stream.Collectors; *

Note: assumes the index can always index log files for Flink write. */ public class DeltaBucketAssigner extends BucketAssigner { - public DeltaBucketAssigner(HoodieFlinkEngineContext context, HoodieWriteConfig config) { - super(context, config); + public DeltaBucketAssigner( + int taskID, + int numTasks, + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + super(taskID, numTasks, context, config); } @Override @@ -77,11 +81,13 @@ public class DeltaBucketAssigner extends BucketAssigner { sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); + smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> { + // in case there is something error, and the file slice has no log file + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); + }); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 1d41003c5..5bd3c687e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.util.RowDataToAvroConverters; @@ -36,7 +39,11 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import javax.annotation.Nullable; + import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; /** * Function that transforms RowData to HoodieRecord. @@ -63,6 +70,11 @@ public class RowDataToHoodieFunction(hoodieKey, payload); } + + /** + * Util to create hoodie pay load instance. + */ + private static class PayloadCreation implements Serializable { + private static final long serialVersionUID = 1L; + + private final boolean shouldCombine; + private final Constructor constructor; + private final String preCombineField; + + private PayloadCreation( + boolean shouldCombine, + Constructor constructor, + @Nullable String preCombineField) { + this.shouldCombine = shouldCombine; + this.constructor = constructor; + this.preCombineField = preCombineField; + } + + public static PayloadCreation instance(Configuration conf) throws Exception { + boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + String preCombineField = null; + final Class[] argTypes; + final Constructor constructor; + if (shouldCombine) { + preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); + argTypes = new Class[] {GenericRecord.class, Comparable.class}; + } else { + argTypes = new Class[] {Option.class}; + } + final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS); + constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); + return new PayloadCreation(shouldCombine, constructor, preCombineField); + } + + public HoodieRecordPayload createPayload(GenericRecord record, boolean isDelete) throws Exception { + if (shouldCombine) { + ValidationUtils.checkState(preCombineField != null); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, + preCombineField, false); + return (HoodieRecordPayload) constructor.newInstance( + isDelete ? null : record, orderingVal); + } else { + return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); + } + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java new file mode 100644 index 000000000..b5957679b --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.exception.HoodieException; + +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +/** + * Coordinator executor that executes the tasks asynchronously, it fails the job + * for any task exceptions. + * + *

We need this because the coordinator methods are called by + * the Job Manager's main thread (mailbox thread), executes the methods asynchronously + * to avoid blocking the main thread. + */ +public class CoordinatorExecutor extends NonThrownExecutor { + private final OperatorCoordinator.Context context; + + public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) { + super(logger); + this.context = context; + } + + @Override + protected void exceptionHook(String actionString, Throwable t) { + this.context.failJob(new HoodieException(actionString, t)); + } + + @Override + public void close() throws Exception { + // wait for the remaining tasks to finish. + executor.shutdown(); + // We do not expect this to actually block for long. At this point, there should + // be very few task running in the executor, if any. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 1d0542e46..87c9c0179 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -21,7 +21,6 @@ package org.apache.hudi.sink.utils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,15 +30,16 @@ import java.util.concurrent.TimeUnit; * An executor service that catches all the throwable with logging. */ public class NonThrownExecutor implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(NonThrownExecutor.class); + private final Logger logger; /** * A single-thread executor to handle all the asynchronous jobs. */ - private final ExecutorService executor; + protected final ExecutorService executor; - public NonThrownExecutor() { + public NonThrownExecutor(Logger logger) { this.executor = Executors.newSingleThreadExecutor(); + this.logger = logger; } /** @@ -48,24 +48,30 @@ public class NonThrownExecutor implements AutoCloseable { public void execute( final ThrowingRunnable action, final String actionName, - final String instant) { + final Object... actionParams) { executor.execute( () -> { + final String actionString = String.format(actionName, actionParams); try { action.run(); - LOG.info("Executor executes action [{}] for instant [{}] success!", actionName, instant); + logger.info("Executor executes action [{}] success!", actionString); } catch (Throwable t) { // if we have a JVM critical error, promote it immediately, there is a good // chance the // logging or job failing will not succeed any more ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - final String errMsg = String.format("Executor executes action [%s] error", actionName); - LOG.error(errMsg, t); + final String errMsg = String.format("Executor executes action [%s] error", actionString); + logger.error(errMsg, t); + exceptionHook(errMsg, t); } }); } + protected void exceptionHook(String errMsg, Throwable t) { + // for sub-class to override. + } + @Override public void close() throws Exception { if (executor != null) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 28568f719..a568a3f5c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -69,8 +69,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { DataStream pipeline = dataStream .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 900ec4165..8fed30b7a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -179,19 +180,6 @@ public class StreamerUtil { } } - /** - * Create a payload class via reflection, do not ordering/precombine value. - */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record) - throws IOException { - try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {Option.class}, Option.of(record)); - } catch (Throwable e) { - throw new IOException("Could not create payload for class: " + payloadClass, e); - } - } - public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) { return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf)); } @@ -215,6 +203,12 @@ public class StreamerUtil { // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .build()) + .withMemoryConfig( + HoodieMemoryConfig.newBuilder() + .withMaxMemoryMaxSize( + conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L, + conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L + ).build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 94e09758a..361fcef12 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -120,8 +120,8 @@ public class StreamWriteITCase extends TestLogger { .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), @@ -179,8 +179,8 @@ public class StreamWriteITCase extends TestLogger { .name("instant_generator") .uid("instant_generator_id") - // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) // use the bucket assigner to generate bucket IDs .transform( "bucket_assigner", @@ -249,8 +249,8 @@ public class StreamWriteITCase extends TestLogger { .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 321abfad7..0afd41418 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -23,12 +23,15 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,11 +45,12 @@ import java.io.IOException; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -60,9 +64,11 @@ public class TestStreamWriteOperatorCoordinator { @BeforeEach public void before() throws Exception { + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); coordinator = new StreamWriteOperatorCoordinator( - TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2); + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); } @AfterEach @@ -99,8 +105,8 @@ public class TestStreamWriteOperatorCoordinator { coordinator.notifyCheckpointComplete(1); String inflight = coordinator.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); assertThat("Instant should be complete", lastCompleted, is(instant)); assertNotEquals("", inflight, "Should start a new instant"); assertNotEquals(instant, inflight, "Should start a new instant"); @@ -131,27 +137,43 @@ public class TestStreamWriteOperatorCoordinator { .instantTime("abc") .writeStatus(Collections.emptyList()) .build(); - assertThrows(IllegalStateException.class, - () -> coordinator.handleEventFromOperator(0, event), + + assertError(() -> coordinator.handleEventFromOperator(0, event), "Receive an unexpected event for instant abc from task 0"); } @Test - public void testCheckpointCompleteWithException() { + public void testCheckpointCompleteWithPartialEvents() { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - String inflightInstant = coordinator.getInstant(); + String instant = coordinator.getInstant(); OperatorEvent event = BatchWriteSuccessEvent.builder() .taskID(0) - .instantTime(inflightInstant) + .instantTime(instant) .writeStatus(Collections.emptyList()) .build(); coordinator.handleEventFromOperator(0, event); - assertThrows(HoodieException.class, - () -> coordinator.notifyCheckpointComplete(1), - "org.apache.hudi.exception.HoodieException: Instant [20210330153432] has a complete checkpoint [1],\n" - + "but the coordinator has not received full write success events,\n" - + "rolls back the instant and rethrow"); + + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), + "Returns early for empty write results"); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + assertNull(lastCompleted, "Returns early for empty write results"); + assertNull(coordinator.getEventBuffer()[0]); + + WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); + writeStatus1.setPartitionPath("par2"); + writeStatus1.setStat(new HoodieWriteStat()); + OperatorEvent event1 = BatchWriteSuccessEvent.builder() + .taskID(1) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus1)) + .isLastBatch(true) + .build(); + coordinator.handleEventFromOperator(1, event1); + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), + "Commits the instant with partial events anyway"); + lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } @Test @@ -159,8 +181,10 @@ public class TestStreamWriteOperatorCoordinator { // override the default configuration Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true); - coordinator = new StreamWriteOperatorCoordinator(conf, 1); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); String instant = coordinator.getInstant(); assertNotEquals("", instant); @@ -180,4 +204,16 @@ public class TestStreamWriteOperatorCoordinator { // never throw for hive synchronization now assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void assertError(Runnable runnable, String message) { + runnable.run(); + // wait a little while for the task to finish + assertThat(coordinator.getContext(), instanceOf(MockOperatorCoordinatorContext.class)); + MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext) coordinator.getContext(); + assertTrue(context.isJobFailed(), message); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 0e53cfaca..2384f7ef9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.utils.TestConfigurations; @@ -53,11 +52,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Test cases for StreamingSinkFunction. + * Test cases for stream write. */ public class TestWriteCopyOnWrite { @@ -193,19 +191,19 @@ public class TestWriteCopyOnWrite { assertThat(writeStatuses.size(), is(0)); // no data write // fails the checkpoint - assertThrows(HoodieException.class, - () -> funcWrapper.checkpointFails(1), - "The last checkpoint was aborted, roll back the last write and throw"); + funcWrapper.checkpointFails(1); + assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(), + "The last checkpoint was aborted, ignore the events"); - // the instant metadata should be cleared - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + // the instant metadata should be reused + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } - // this returns early cause there is no inflight instant + // this returns early because there is no inflight instant funcWrapper.checkpointFunction(2); // do not sent the write event and fails the checkpoint, // behaves like the last checkpoint is successful. @@ -501,16 +499,11 @@ public class TestWriteCopyOnWrite { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); - assertFalse(funcWrapper.isAllPartitionsLoaded(), - "All partitions assume to be loaded into the index state"); + funcWrapper.checkpointComplete(2); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); - // next element triggers all partitions load check - funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0)); - assertTrue(funcWrapper.isAllPartitionsLoaded(), - "All partitions assume to be loaded into the index state"); } // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 04cee442a..33457b509 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -146,6 +146,55 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); } + /** + * Test that only partial small files are assigned to the task. + */ + @Test + public void testInsertWithPartialSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1, f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); + BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + bucketInfo2 = mockBucketAssigner2.addInsert("par3"); + assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); + + bucketInfo2 = mockBucketAssigner2.addInsert("par3"); + assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); + } + @Test public void testUpdateAndInsertWithSmallFiles() { SmallFile f0 = new SmallFile(); @@ -187,6 +236,60 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); } + /** + * Test that only partial small files are assigned to the task. + */ + @Test + public void testUpdateAndInsertWithPartialSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1, f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); + mockBucketAssigner.addUpdate("par1", "f0"); + + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par1", "f2"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + + MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); + mockBucketAssigner2.addUpdate("par1", "f0"); + + BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addUpdate("par1", "f2"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + } + private void assertBucketEquals( BucketInfo bucketInfo, String partition, @@ -220,7 +323,16 @@ public class TestBucketAssigner { HoodieFlinkEngineContext context, HoodieWriteConfig config, Map> smallFilesMap) { - super(context, config); + this(0, 1, context, config, smallFilesMap); + } + + MockBucketAssigner( + int taskID, + int numTasks, + HoodieFlinkEngineContext context, + HoodieWriteConfig config, + Map> smallFilesMap) { + super(taskID, numTasks, context, config); this.smallFilesMap = smallFilesMap; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index e8796ec72..fe3dd818e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -28,7 +28,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.Output; @@ -78,6 +80,9 @@ public class CompactFunctionWrapper { compactFunction = new CompactFunction(conf); compactFunction.setRuntimeContext(runtimeContext); compactFunction.open(conf); + final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor( + new MockOperatorCoordinatorContext(new OperatorID(), 1)); + compactFunction.setExecutor(syncExecutor); commitSink = new CompactionCommitSink(conf); commitSink.setRuntimeContext(runtimeContext); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java new file mode 100644 index 000000000..099dfd63f --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mock {@link CoordinatorExecutor} that executes the actions synchronously. + */ +public class MockCoordinatorExecutor extends CoordinatorExecutor { + private static final Logger LOG = LoggerFactory.getLogger(MockCoordinatorExecutor.class); + + public MockCoordinatorExecutor(OperatorCoordinator.Context context) { + super(context, LOG); + } + + @Override + public void execute(ThrowingRunnable action, String actionName, Object... actionParams) { + final String actionString = String.format(actionName, actionParams); + try { + action.run(); + LOG.info("Executor executes action [{}] success!", actionString); + } catch (Throwable t) { + // if we have a JVM critical error, promote it immediately, there is a good + // chance the + // logging or job failing will not succeed any more + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + exceptionHook(actionString, t); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 72f2e89a6..a4b6c16a3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -33,7 +33,9 @@ import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; @@ -57,6 +59,7 @@ public class StreamWriteFunctionWrapper { private final IOManager ioManager; private final StreamingRuntimeContext runtimeContext; private final MockOperatorEventGateway gateway; + private final MockOperatorCoordinatorContext coordinatorContext; private final StreamWriteOperatorCoordinator coordinator; private final MockFunctionInitializationContext functionInitializationContext; @@ -84,13 +87,15 @@ public class StreamWriteFunctionWrapper { this.gateway = new MockOperatorEventGateway(); this.conf = conf; // one function - this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); + this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); } public void openFunction() throws Exception { this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); @@ -181,6 +186,10 @@ public class StreamWriteFunctionWrapper { return coordinator; } + public MockOperatorCoordinatorContext getCoordinatorContext() { + return coordinatorContext; + } + public void clearIndexState() { this.bucketAssignerFunction.clearIndexState(); } @@ -188,8 +197,4 @@ public class StreamWriteFunctionWrapper { public boolean isKeyInState(HoodieKey hoodieKey) { return this.bucketAssignerFunction.isKeyInState(hoodieKey); } - - public boolean isAllPartitionsLoaded() { - return this.bucketAssignerFunction.isAllPartitionsLoaded(); - } }