diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 49c81b790..e5426ca11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -41,7 +41,6 @@ public class HoodieIndexUtils { * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions. * * @param partition Partition of interest - * @param context Instance of {@link HoodieEngineContext} to use * @param hoodieTable Instance of {@link HoodieTable} of interest * @return the list of {@link HoodieBaseFile} */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 6a6bcedbf..7eeec4c70 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -260,11 +260,17 @@ public class HoodieFlinkWriteClient extends * but cleaning action should trigger after all the write actions within a * checkpoint finish. * - * @param instantTime The latest successful commit time + * @param table Table to commit on + * @param metadata Commit Metadata corresponding to committed instant + * @param instantTime Instant Time + * @param extraMetadata Additional Metadata passed by user */ - public void postCommit(String instantTime) { + @Override + protected void postCommit(HoodieTable>, List, List> table, + HoodieCommitMetadata metadata, + String instantTime, + Option> extraMetadata) { try { - HoodieTable table = createTable(config, hadoopConf); // Delete the marker directory for the instant. new MarkerFiles(createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 23a87e770..bb30b2a1f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -47,7 +47,7 @@ public class ReflectionUtils { private static Map> clazzCache = new HashMap<>(); - private static Class getClass(String clazzName) { + public static Class getClass(String clazzName) { if (!clazzCache.containsKey(clazzName)) { try { Class clazz = Class.forName(clazzName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8691d91b5..b66be2b1e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -255,7 +255,7 @@ public class FlinkOptions { public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size.MB") .doubleType() - .defaultValue(128D) // 128MB + .defaultValue(2D) // 2MB .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); // ------------------------------------------------------------------------ @@ -294,6 +294,12 @@ public class FlinkOptions { .defaultValue(3600) // default 1 hour .withDescription("Max delta seconds time needed to trigger compaction, default 1 hour"); + public static final ConfigOption COMPACTION_MAX_MEMORY = ConfigOptions + .key("compaction.max_memory") + .intType() + .defaultValue(100) // default 100 MB + .withDescription("Max memory in MB for compaction spillable map, default 100MB"); + public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 7b875ff8e..14dd827a5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Sink function that cleans the old commits. @@ -40,6 +42,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; */ public class CleanFunction extends AbstractRichFunction implements SinkFunction, CheckpointedFunction, CheckpointListener { + private static final Logger LOG = LoggerFactory.getLogger(CleanFunction.class); + private final Configuration conf; private HoodieFlinkWriteClient writeClient; @@ -56,7 +60,7 @@ public class CleanFunction extends AbstractRichFunction super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.executor = new NonThrownExecutor(); + this.executor = new NonThrownExecutor(LOG); } } @@ -70,7 +74,7 @@ public class CleanFunction extends AbstractRichFunction // ensure to switch the isCleaning flag this.isCleaning = false; } - }, "wait for cleaning finish", ""); + }, "wait for cleaning finish"); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 364d28ebb..b0321acda 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -31,9 +31,9 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -50,8 +50,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; /** @@ -103,21 +101,6 @@ public class StreamWriteFunction */ private transient Map buckets; - /** - * The buffer lock to control data buffering/flushing. - */ - private transient ReentrantLock bufferLock; - - /** - * The condition to decide whether to add new records into the buffer. - */ - private transient Condition addToBufferCondition; - - /** - * Flag saying whether there is an on-going checkpoint. - */ - private volatile boolean onCheckpointing = false; - /** * Config options. */ @@ -169,32 +152,15 @@ public class StreamWriteFunction @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { - bufferLock.lock(); - try { - // Based on the fact that the coordinator starts the checkpoint first, - // it would check the validity. - this.onCheckpointing = true; - // wait for the buffer data flush out and request a new instant - flushRemaining(false); - // signal the task thread to start buffering - addToBufferCondition.signal(); - } finally { - this.onCheckpointing = false; - bufferLock.unlock(); - } + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushRemaining(false); } @Override public void processElement(I value, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - bufferLock.lock(); - try { - if (onCheckpointing) { - addToBufferCondition.await(); - } - bufferRecord(value); - } finally { - bufferLock.unlock(); - } + bufferRecord(value); } @Override @@ -247,8 +213,6 @@ public class StreamWriteFunction private void initBuffer() { this.buckets = new LinkedHashMap<>(); - this.bufferLock = new ReentrantLock(); - this.addToBufferCondition = this.bufferLock.newCondition(); } private void initWriteFunction() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 51149e2dc..54a7603a5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -26,27 +26,20 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.util.Preconditions; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -64,7 +57,7 @@ import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; *

This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the * operator tasks write the buffer successfully for a round of checkpoint. * - *

If there is no data for a round of checkpointing, it rolls back the metadata. + *

If there is no data for a round of checkpointing, it resets the events buffer and returns early. * * @see StreamWriteFunction for the work flow and semantics */ @@ -77,20 +70,20 @@ public class StreamWriteOperatorCoordinator */ private final Configuration conf; + /** + * Coordinator context. + */ + private final Context context; + /** * Write client. */ private transient HoodieFlinkWriteClient writeClient; - /** - * Current data buffering checkpoint. - */ - private long inFlightCheckpoint = -1; - /** * Current REQUESTED instant, for validation. */ - private String instant = ""; + private volatile String instant = ""; /** * Event buffer for one round of checkpointing. When all the elements are non-null and have the same @@ -111,7 +104,12 @@ public class StreamWriteOperatorCoordinator /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ - private NonThrownExecutor executor; + private CoordinatorExecutor executor; + + /** + * A single-thread executor to handle asynchronous hive sync. + */ + private NonThrownExecutor hiveSyncExecutor; /** * Context that holds variables for asynchronous hive sync. @@ -121,14 +119,15 @@ public class StreamWriteOperatorCoordinator /** * Constructs a StreamingSinkOperatorCoordinator. * - * @param conf The config options - * @param parallelism The operator task number + * @param conf The config options + * @param context The coordinator context */ public StreamWriteOperatorCoordinator( Configuration conf, - int parallelism) { + Context context) { this.conf = conf; - this.parallelism = parallelism; + this.context = context; + this.parallelism = context.currentParallelism(); this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); } @@ -142,6 +141,8 @@ public class StreamWriteOperatorCoordinator initTableIfNotExists(this.conf); // start a new instant startInstant(); + // start the executor + this.executor = new CoordinatorExecutor(this.context, LOG); // start the executor if required if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { initHiveSync(); @@ -162,38 +163,46 @@ public class StreamWriteOperatorCoordinator @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { - try { - this.inFlightCheckpoint = checkpointId; - result.complete(writeCheckpointBytes()); - } catch (Throwable throwable) { - // when a checkpoint fails, throws directly. - result.completeExceptionally( - new CompletionException( - String.format("Failed to checkpoint Instant %s for source %s", - this.instant, this.getClass().getSimpleName()), throwable)); - } + executor.execute( + () -> { + try { + result.complete(new byte[0]); + } catch (Throwable throwable) { + // when a checkpoint fails, throws directly. + result.completeExceptionally( + new CompletionException( + String.format("Failed to checkpoint Instant %s for source %s", + this.instant, this.getClass().getSimpleName()), throwable)); + } + }, "taking checkpoint %d", checkpointId + ); } @Override public void notifyCheckpointComplete(long checkpointId) { - // start to commit the instant. - final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n" - + "but the coordinator has not received full write success events,\n" - + "rolls back the instant and rethrow", this.instant, checkpointId); - checkAndForceCommit(errorMsg); - // if async compaction is on, schedule the compaction - if (needsScheduleCompaction) { - writeClient.scheduleCompaction(Option.empty()); - } + executor.execute( + () -> { + // for streaming mode, commits the ever received events anyway, + // the stream write task snapshot and flush the data buffer synchronously in sequence, + // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) + final boolean committed = commitInstant(); + if (committed) { + // if async compaction is on, schedule the compaction + if (needsScheduleCompaction) { + writeClient.scheduleCompaction(Option.empty()); + } + // start new instant. + startInstant(); + } + }, "commits the instant %s", this.instant + ); // sync Hive if is enabled syncHiveIfEnabled(); - // start new instant. - startInstant(); } private void syncHiveIfEnabled() { if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { - this.executor.execute(this::syncHive, "sync hive metadata", this.instant); + this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); } } @@ -211,43 +220,38 @@ public class StreamWriteOperatorCoordinator this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } - public void notifyCheckpointAborted(long checkpointId) { - Preconditions.checkState(inFlightCheckpoint == checkpointId, - "The aborted checkpoint should always be the last checkpoint"); - checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw"); - } - @Override public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception { - if (checkpointData != null) { - // restore when any checkpoint completed - deserializeCheckpointAndRestore(checkpointData); - } + // no operation } @Override public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { - // no event to handle - ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, - "The coordinator can only handle BatchWriteSuccessEvent"); - BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - // the write task does not block after checkpointing(and before it receives a checkpoint success event), - // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint - // success event, the data buffer would flush with an older instant time. - ValidationUtils.checkState( - HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), - String.format("Receive an unexpected event for instant %s from task %d", - event.getInstantTime(), event.getTaskID())); - if (this.eventBuffer[event.getTaskID()] != null) { - this.eventBuffer[event.getTaskID()].mergeWith(event); - } else { - this.eventBuffer[event.getTaskID()] = event; - } - if (event.isEndInput() && checkReady()) { - // start to commit the instant. - doCommit(); - // no compaction scheduling for batch mode - } + executor.execute( + () -> { + // no event to handle + ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, + "The coordinator can only handle BatchWriteSuccessEvent"); + BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; + // the write task does not block after checkpointing(and before it receives a checkpoint success event), + // if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // success event, the data buffer would flush with an older instant time. + ValidationUtils.checkState( + HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), + String.format("Receive an unexpected event for instant %s from task %d", + event.getInstantTime(), event.getTaskID())); + if (this.eventBuffer[event.getTaskID()] != null) { + this.eventBuffer[event.getTaskID()].mergeWith(event); + } else { + this.eventBuffer[event.getTaskID()] = event; + } + if (event.isEndInput() && allEventsReceived()) { + // start to commit the instant. + commitInstant(); + // no compaction scheduling for batch mode + } + }, "handle write success event for instant %s", this.instant + ); } @Override @@ -265,85 +269,31 @@ public class StreamWriteOperatorCoordinator // ------------------------------------------------------------------------- private void initHiveSync() { - this.executor = new NonThrownExecutor(); + this.hiveSyncExecutor = new NonThrownExecutor(LOG); this.hiveSyncContext = HiveSyncContext.create(conf); } - static byte[] readBytes(DataInputStream in, int size) throws IOException { - byte[] bytes = new byte[size]; - in.readFully(bytes); - return bytes; - } - - /** - * Serialize the coordinator state. The current implementation may not be super efficient, - * but it should not matter that much because most of the state should be rather small. - * Large states themselves may already be a problem regardless of how the serialization - * is implemented. - * - * @return A byte array containing the serialized state of the source coordinator. - * @throws IOException When something goes wrong in serialization. - */ - private byte[] writeCheckpointBytes() throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { - - out.writeLong(this.inFlightCheckpoint); - byte[] serializedInstant = this.instant.getBytes(); - out.writeInt(serializedInstant.length); - out.write(serializedInstant); - out.flush(); - return baos.toByteArray(); - } - } - - /** - * Restore the state of this source coordinator from the state bytes. - * - * @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()} - * @throws Exception When the deserialization failed. - */ - private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception { - try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - DataInputStream in = new DataInputViewStreamWrapper(bais)) { - long checkpointID = in.readLong(); - int serializedInstantSize = in.readInt(); - byte[] serializedInstant = readBytes(in, serializedInstantSize); - this.inFlightCheckpoint = checkpointID; - this.instant = new String(serializedInstant); - } - } - private void reset() { - this.instant = ""; this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; } - private void checkAndForceCommit(String errMsg) { - if (!checkReady()) { - // forced but still has inflight instant - String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE)); - if (inflightInstant != null) { - assert inflightInstant.equals(this.instant); - writeClient.rollback(this.instant); - throw new HoodieException(errMsg); - } - if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { - // The last checkpoint finished successfully. - return; - } - } - doCommit(); - } - /** Checks the buffer is ready to commit. */ - private boolean checkReady() { + private boolean allEventsReceived() { return Arrays.stream(eventBuffer) .allMatch(event -> event != null && event.isReady(this.instant)); } - /** Performs the actual commit action. */ - private void doCommit() { + /** + * Commits the instant. + * + * @return true if the write statuses are committed successfully. + */ + private boolean commitInstant() { + if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { + // The last checkpoint finished successfully. + return false; + } + List writeResults = Arrays.stream(eventBuffer) .filter(Objects::nonNull) .map(BatchWriteSuccessEvent::getWriteStatuses) @@ -351,12 +301,16 @@ public class StreamWriteOperatorCoordinator .collect(Collectors.toList()); if (writeResults.size() == 0) { - // No data has written, clear the metadata file - this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant); + // No data has written, reset the buffer and returns early reset(); - return; + return false; } + doCommit(writeResults); + return true; + } + /** Performs the actual commit action. */ + private void doCommit(List writeResults) { // commit or rollback long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); @@ -371,7 +325,6 @@ public class StreamWriteOperatorCoordinator boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); if (success) { - writeClient.postCommit(this.instant); reset(); LOG.info("Commit instant [{}] success!", this.instant); } else { @@ -409,6 +362,19 @@ public class StreamWriteOperatorCoordinator return writeClient; } + @VisibleForTesting + public Context getContext() { + return context; + } + + @VisibleForTesting + public void setExecutor(CoordinatorExecutor executor) throws Exception { + if (this.executor != null) { + this.executor.close(); + } + this.executor = executor; + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- @@ -432,7 +398,7 @@ public class StreamWriteOperatorCoordinator @Override public OperatorCoordinator create(Context context) { - return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism()); + return new StreamWriteOperatorCoordinator(this.conf, context); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 7f4f7b97b..ee8678b44 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -21,13 +21,17 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; @@ -36,6 +40,7 @@ import java.util.List; * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. */ public class CompactFunction extends KeyedProcessFunction { + private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class); /** * Config options. @@ -52,6 +57,11 @@ public class CompactFunction extends KeyedProcessFunction collector) throws Exception { final String instantTime = event.getCompactionInstantTime(); final CompactionOperation compactionOperation = event.getOperation(); + // executes the compaction task asynchronously to not block the checkpoint barrier propagate. + executor.execute( + () -> { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + this.writeClient.getConfig(), + this.writeClient.getEngineContext(), + this.writeClient.getHoodieTable().getMetaClient()), + this.writeClient.getHoodieTable().getMetaClient(), + this.writeClient.getConfig(), + compactionOperation, + instantTime); + collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + }, "Execute compaction for instant %s from task %d", instantTime, taskID + ); + } - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + @VisibleForTesting + public void setExecutor(NonThrownExecutor executor) { + this.executor = executor; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 86dae20d2..41831cd46 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -23,8 +23,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.sink.CleanFunction; @@ -37,8 +35,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; -import java.util.Objects; +import java.util.Map; import java.util.stream.Collectors; /** @@ -67,13 +66,9 @@ public class CompactionCommitSink extends CleanFunction { /** * Buffer to collect the event from each compact task {@code CompactFunction}. + * The key is the instant time. */ - private transient List commitBuffer; - - /** - * Current on-going compaction instant time. - */ - private String compactionInstantTime; + private transient Map> commitBuffer; public CompactionCommitSink(Configuration conf) { super(conf); @@ -84,36 +79,32 @@ public class CompactionCommitSink extends CleanFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.commitBuffer = new ArrayList<>(); + this.commitBuffer = new HashMap<>(); } @Override public void invoke(CompactionCommitEvent event, Context context) throws Exception { - if (compactionInstantTime == null) { - compactionInstantTime = event.getInstant(); - } else if (!event.getInstant().equals(compactionInstantTime)) { - // last compaction still not finish, rolls it back - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime); - writeClient.rollbackInflightCompaction(inflightInstant); - this.compactionInstantTime = event.getInstant(); - } - this.commitBuffer.add(event); - commitIfNecessary(); + final String instant = event.getInstant(); + commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>()) + .add(event); + commitIfNecessary(instant, commitBuffer.get(instant)); } /** * Condition to commit: the commit buffer has equal size with the compaction plan operations * and all the compact commit event {@link CompactionCommitEvent} has the same compaction instant time. + * + * @param instant Compaction commit instant time + * @param events Commit events ever received for the instant */ - private void commitIfNecessary() throws IOException { + private void commitIfNecessary(String instant, List events) throws IOException { HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - this.writeClient.getHoodieTable().getMetaClient(), compactionInstantTime); - boolean isReady = compactionPlan.getOperations().size() == commitBuffer.size() - && commitBuffer.stream().allMatch(event -> event != null && Objects.equals(event.getInstant(), compactionInstantTime)); + this.writeClient.getHoodieTable().getMetaClient(), instant); + boolean isReady = compactionPlan.getOperations().size() == events.size(); if (!isReady) { return; } - List statuses = this.commitBuffer.stream() + List statuses = events.stream() .map(CompactionCommitEvent::getWriteStatuses) .flatMap(Collection::stream) .collect(Collectors.toList()); @@ -127,16 +118,15 @@ public class CompactionCommitSink extends CleanFunction { } metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema()); this.writeClient.completeCompaction( - metadata, statuses, this.writeClient.getHoodieTable(), compactionInstantTime); + metadata, statuses, this.writeClient.getHoodieTable(), instant); } // commit the compaction - this.writeClient.commitCompaction(compactionInstantTime, statuses, Option.empty()); + this.writeClient.commitCompaction(instant, statuses, Option.empty()); // reset the status - reset(); + reset(instant); } - private void reset() { - this.commitBuffer.clear(); - this.compactionInstantTime = null; + private void reset(String instant) { + this.commitBuffer.remove(instant); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 5ea5f924e..9c23259fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -21,7 +21,6 @@ package org.apache.hudi.sink.partitioner; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -56,8 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; /** * The function to build the write profile incrementally for records within a checkpoint, @@ -106,29 +103,11 @@ public class BucketAssignFunction> private final boolean isChangingRecords; - /** - * All the partition paths when the task starts. It is used to help checking whether all the partitions - * are loaded into the state. - */ - private transient Set initialPartitionsToLoad; - /** * State to book-keep which partition is loaded into the index state {@code indexState}. */ private MapState partitionLoadState; - /** - * Whether all partitions are loaded, if it is true, - * we can only check the state for locations. - */ - private boolean allPartitionsLoaded = false; - - /** - * Flag saying whether to check that all the partitions are loaded. - * So that there is chance that flag {@code allPartitionsLoaded} becomes true. - */ - private boolean checkPartition = true; - public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( @@ -144,12 +123,11 @@ public class BucketAssignFunction> new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext())); this.bucketAssigner = BucketAssigners.create( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); - - // initialize and check the partitions load state - loadInitialPartitions(); } @Override @@ -181,15 +159,7 @@ public class BucketAssignFunction> final BucketInfo bucketInfo; final HoodieRecordLocation location; - // Checks whether all the partitions are loaded first. - if (checkPartition && !allPartitionsLoaded) { - checkPartitionsLoaded(); - checkPartition = false; - } - - if (!allPartitionsLoaded - && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition - && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { + if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) { // If the partition records are never loaded, load the records first. loadRecords(hoodieKey.getPartitionPath()); } @@ -226,9 +196,6 @@ public class BucketAssignFunction> public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. this.bucketAssigner.refreshTable(); - if (!allPartitionsLoaded) { - checkPartition = true; - } } /** @@ -241,12 +208,21 @@ public class BucketAssignFunction> HoodieTable hoodieTable = bucketAssigner.getTable(); List latestBaseFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); + final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int taskID = getRuntimeContext().getIndexOfThisSubtask(); for (HoodieBaseFile baseFile : latestBaseFiles) { List hoodieKeys = ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); hoodieKeys.forEach(hoodieKey -> { try { - this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + // Reference: org.apache.flink.streaming.api.datastream.KeyedStream, + // the input records is shuffled by record key + boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( + hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; + if (shouldLoad) { + this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); + } } catch (Exception e) { throw new HoodieIOException("Error when load record keys from file: " + baseFile); } @@ -256,49 +232,9 @@ public class BucketAssignFunction> partitionLoadState.put(partitionPath, 0); } - /** - * Loads the existing partitions for this task. - */ - private void loadInitialPartitions() { - List allPartitionPaths = FSUtils.getAllPartitionPaths(this.context, - this.conf.getString(FlinkOptions.PATH), false, false, false); - final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); - final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); - final int taskID = getRuntimeContext().getIndexOfThisSubtask(); - // reference: org.apache.flink.streaming.api.datastream.KeyedStream - this.initialPartitionsToLoad = allPartitionPaths.stream() - .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID) - .collect(Collectors.toSet()); - } - - /** - * Checks whether all the partitions of the table are loaded into the state, - * set the flag {@code allPartitionsLoaded} to true if it is. - */ - private void checkPartitionsLoaded() { - for (String partition : this.initialPartitionsToLoad) { - try { - if (!this.partitionLoadState.contains(partition)) { - return; - } - } catch (Exception e) { - LOG.warn("Error when check whether all partitions are loaded, ignored", e); - throw new HoodieException(e); - } - } - this.allPartitionsLoaded = true; - } - - @VisibleForTesting - public boolean isAllPartitionsLoaded() { - return this.allPartitionsLoaded; - } - @VisibleForTesting public void clearIndexState() { - this.allPartitionsLoaded = false; this.indexState.clear(); - loadInitialPartitions(); } @VisibleForTesting diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 58bbe9c4a..d89ad8319 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -60,6 +60,16 @@ import java.util.stream.Collectors; public class BucketAssigner { private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); + /** + * Task ID. + */ + private final int taskID; + + /** + * Number of tasks. + */ + private final int numTasks; + /** * Remembers what type each bucket is for later. */ @@ -104,12 +114,16 @@ public class BucketAssigner { private final Map newFileAssignStates; public BucketAssigner( + int taskID, + int numTasks, HoodieFlinkEngineContext context, HoodieWriteConfig config) { bucketInfoMap = new HashMap<>(); partitionSmallFilesMap = new HashMap<>(); smallFileAssignStates = new HashMap<>(); newFileAssignStates = new HashMap<>(); + this.taskID = taskID; + this.numTasks = numTasks; this.context = context; this.config = config; this.table = HoodieFlinkTable.create(this.config, this.context); @@ -187,7 +201,7 @@ public class BucketAssigner { if (partitionSmallFilesMap.containsKey(partitionPath)) { return partitionSmallFilesMap.get(partitionPath); } - List smallFiles = getSmallFiles(partitionPath); + List smallFiles = smallFilesOfThisTask(getSmallFiles(partitionPath)); if (smallFiles.size() > 0) { LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); partitionSmallFilesMap.put(partitionPath, smallFiles); @@ -240,6 +254,15 @@ public class BucketAssigner { return smallFileLocations; } + private List smallFilesOfThisTask(List smallFiles) { + // computes the small files to write inserts for this task. + List smallFilesOfThisTask = new ArrayList<>(); + for (int i = taskID; i < smallFiles.size(); i += numTasks) { + smallFilesOfThisTask.add(smallFiles.get(i)); + } + return smallFilesOfThisTask; + } + /** * Obtains the average record size based on records written during previous commits. Used for estimating how many * records pack into one file. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index f5703f16f..237ec27b2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -33,20 +33,24 @@ public abstract class BucketAssigners { /** * Creates a {@code BucketAssigner}. * + * @param taskID The task ID + * @param numTasks The number of tasks * @param tableType The table type * @param context The engine context * @param config The configuration * @return the bucket assigner instance */ public static BucketAssigner create( + int taskID, + int numTasks, HoodieTableType tableType, HoodieFlinkEngineContext context, HoodieWriteConfig config) { switch (tableType) { case COPY_ON_WRITE: - return new BucketAssigner(context, config); + return new BucketAssigner(taskID, numTasks, context, config); case MERGE_ON_READ: - return new DeltaBucketAssigner(context, config); + return new DeltaBucketAssigner(taskID, numTasks, context, config); default: throw new AssertionError(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java index 895f593fa..deb8250e5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java @@ -40,8 +40,12 @@ import java.util.stream.Collectors; *

Note: assumes the index can always index log files for Flink write. */ public class DeltaBucketAssigner extends BucketAssigner { - public DeltaBucketAssigner(HoodieFlinkEngineContext context, HoodieWriteConfig config) { - super(context, config); + public DeltaBucketAssigner( + int taskID, + int numTasks, + HoodieFlinkEngineContext context, + HoodieWriteConfig config) { + super(taskID, numTasks, context, config); } @Override @@ -77,11 +81,13 @@ public class DeltaBucketAssigner extends BucketAssigner { sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); + smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> { + // in case there is something error, and the file slice has no log file + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); + }); } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 1d41003c5..5bd3c687e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.util.RowDataToAvroConverters; @@ -36,7 +39,11 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import javax.annotation.Nullable; + import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; /** * Function that transforms RowData to HoodieRecord. @@ -63,6 +70,11 @@ public class RowDataToHoodieFunction(hoodieKey, payload); } + + /** + * Util to create hoodie pay load instance. + */ + private static class PayloadCreation implements Serializable { + private static final long serialVersionUID = 1L; + + private final boolean shouldCombine; + private final Constructor constructor; + private final String preCombineField; + + private PayloadCreation( + boolean shouldCombine, + Constructor constructor, + @Nullable String preCombineField) { + this.shouldCombine = shouldCombine; + this.constructor = constructor; + this.preCombineField = preCombineField; + } + + public static PayloadCreation instance(Configuration conf) throws Exception { + boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; + String preCombineField = null; + final Class[] argTypes; + final Constructor constructor; + if (shouldCombine) { + preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); + argTypes = new Class[] {GenericRecord.class, Comparable.class}; + } else { + argTypes = new Class[] {Option.class}; + } + final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS); + constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes); + return new PayloadCreation(shouldCombine, constructor, preCombineField); + } + + public HoodieRecordPayload createPayload(GenericRecord record, boolean isDelete) throws Exception { + if (shouldCombine) { + ValidationUtils.checkState(preCombineField != null); + Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(record, + preCombineField, false); + return (HoodieRecordPayload) constructor.newInstance( + isDelete ? null : record, orderingVal); + } else { + return (HoodieRecordPayload) this.constructor.newInstance(Option.of(record)); + } + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java new file mode 100644 index 000000000..b5957679b --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.exception.HoodieException; + +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +/** + * Coordinator executor that executes the tasks asynchronously, it fails the job + * for any task exceptions. + * + *

We need this because the coordinator methods are called by + * the Job Manager's main thread (mailbox thread), executes the methods asynchronously + * to avoid blocking the main thread. + */ +public class CoordinatorExecutor extends NonThrownExecutor { + private final OperatorCoordinator.Context context; + + public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) { + super(logger); + this.context = context; + } + + @Override + protected void exceptionHook(String actionString, Throwable t) { + this.context.failJob(new HoodieException(actionString, t)); + } + + @Override + public void close() throws Exception { + // wait for the remaining tasks to finish. + executor.shutdown(); + // We do not expect this to actually block for long. At this point, there should + // be very few task running in the executor, if any. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 1d0542e46..87c9c0179 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -21,7 +21,6 @@ package org.apache.hudi.sink.utils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,15 +30,16 @@ import java.util.concurrent.TimeUnit; * An executor service that catches all the throwable with logging. */ public class NonThrownExecutor implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(NonThrownExecutor.class); + private final Logger logger; /** * A single-thread executor to handle all the asynchronous jobs. */ - private final ExecutorService executor; + protected final ExecutorService executor; - public NonThrownExecutor() { + public NonThrownExecutor(Logger logger) { this.executor = Executors.newSingleThreadExecutor(); + this.logger = logger; } /** @@ -48,24 +48,30 @@ public class NonThrownExecutor implements AutoCloseable { public void execute( final ThrowingRunnable action, final String actionName, - final String instant) { + final Object... actionParams) { executor.execute( () -> { + final String actionString = String.format(actionName, actionParams); try { action.run(); - LOG.info("Executor executes action [{}] for instant [{}] success!", actionName, instant); + logger.info("Executor executes action [{}] success!", actionString); } catch (Throwable t) { // if we have a JVM critical error, promote it immediately, there is a good // chance the // logging or job failing will not succeed any more ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - final String errMsg = String.format("Executor executes action [%s] error", actionName); - LOG.error(errMsg, t); + final String errMsg = String.format("Executor executes action [%s] error", actionString); + logger.error(errMsg, t); + exceptionHook(errMsg, t); } }); } + protected void exceptionHook(String errMsg, Throwable t) { + // for sub-class to override. + } + @Override public void close() throws Exception { if (executor != null) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 28568f719..a568a3f5c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -69,8 +69,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { DataStream pipeline = dataStream .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 900ec4165..8fed30b7a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -179,19 +180,6 @@ public class StreamerUtil { } } - /** - * Create a payload class via reflection, do not ordering/precombine value. - */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record) - throws IOException { - try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {Option.class}, Option.of(record)); - } catch (Throwable e) { - throw new IOException("Could not create payload for class: " + payloadClass, e); - } - } - public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) { return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf)); } @@ -215,6 +203,12 @@ public class StreamerUtil { // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .build()) + .withMemoryConfig( + HoodieMemoryConfig.newBuilder() + .withMaxMemoryMaxSize( + conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L, + conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L + ).build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 94e09758a..361fcef12 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -120,8 +120,8 @@ public class StreamWriteITCase extends TestLogger { .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), @@ -179,8 +179,8 @@ public class StreamWriteITCase extends TestLogger { .name("instant_generator") .uid("instant_generator_id") - // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) // use the bucket assigner to generate bucket IDs .transform( "bucket_assigner", @@ -249,8 +249,8 @@ public class StreamWriteITCase extends TestLogger { .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by partition path, to avoid multiple subtasks write to a partition at the same time - .keyBy(HoodieRecord::getPartitionPath) + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 321abfad7..0afd41418 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -23,12 +23,15 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,11 +45,12 @@ import java.io.IOException; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -60,9 +64,11 @@ public class TestStreamWriteOperatorCoordinator { @BeforeEach public void before() throws Exception { + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); coordinator = new StreamWriteOperatorCoordinator( - TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2); + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); } @AfterEach @@ -99,8 +105,8 @@ public class TestStreamWriteOperatorCoordinator { coordinator.notifyCheckpointComplete(1); String inflight = coordinator.getWriteClient() - .getInflightAndRequestedInstant("COPY_ON_WRITE"); - String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"); + .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); assertThat("Instant should be complete", lastCompleted, is(instant)); assertNotEquals("", inflight, "Should start a new instant"); assertNotEquals(instant, inflight, "Should start a new instant"); @@ -131,27 +137,43 @@ public class TestStreamWriteOperatorCoordinator { .instantTime("abc") .writeStatus(Collections.emptyList()) .build(); - assertThrows(IllegalStateException.class, - () -> coordinator.handleEventFromOperator(0, event), + + assertError(() -> coordinator.handleEventFromOperator(0, event), "Receive an unexpected event for instant abc from task 0"); } @Test - public void testCheckpointCompleteWithException() { + public void testCheckpointCompleteWithPartialEvents() { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - String inflightInstant = coordinator.getInstant(); + String instant = coordinator.getInstant(); OperatorEvent event = BatchWriteSuccessEvent.builder() .taskID(0) - .instantTime(inflightInstant) + .instantTime(instant) .writeStatus(Collections.emptyList()) .build(); coordinator.handleEventFromOperator(0, event); - assertThrows(HoodieException.class, - () -> coordinator.notifyCheckpointComplete(1), - "org.apache.hudi.exception.HoodieException: Instant [20210330153432] has a complete checkpoint [1],\n" - + "but the coordinator has not received full write success events,\n" - + "rolls back the instant and rethrow"); + + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1), + "Returns early for empty write results"); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + assertNull(lastCompleted, "Returns early for empty write results"); + assertNull(coordinator.getEventBuffer()[0]); + + WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); + writeStatus1.setPartitionPath("par2"); + writeStatus1.setStat(new HoodieWriteStat()); + OperatorEvent event1 = BatchWriteSuccessEvent.builder() + .taskID(1) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus1)) + .isLastBatch(true) + .build(); + coordinator.handleEventFromOperator(1, event1); + assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), + "Commits the instant with partial events anyway"); + lastCompleted = coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE); + assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } @Test @@ -159,8 +181,10 @@ public class TestStreamWriteOperatorCoordinator { // override the default configuration Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true); - coordinator = new StreamWriteOperatorCoordinator(conf, 1); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); String instant = coordinator.getInstant(); assertNotEquals("", instant); @@ -180,4 +204,16 @@ public class TestStreamWriteOperatorCoordinator { // never throw for hive synchronization now assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void assertError(Runnable runnable, String message) { + runnable.run(); + // wait a little while for the task to finish + assertThat(coordinator.getContext(), instanceOf(MockOperatorCoordinatorContext.class)); + MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext) coordinator.getContext(); + assertTrue(context.isJobFailed(), message); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 0e53cfaca..2384f7ef9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.utils.TestConfigurations; @@ -53,11 +52,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Test cases for StreamingSinkFunction. + * Test cases for stream write. */ public class TestWriteCopyOnWrite { @@ -193,19 +191,19 @@ public class TestWriteCopyOnWrite { assertThat(writeStatuses.size(), is(0)); // no data write // fails the checkpoint - assertThrows(HoodieException.class, - () -> funcWrapper.checkpointFails(1), - "The last checkpoint was aborted, roll back the last write and throw"); + funcWrapper.checkpointFails(1); + assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(), + "The last checkpoint was aborted, ignore the events"); - // the instant metadata should be cleared - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + // the instant metadata should be reused + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); for (RowData rowData : TestData.DATA_SET_INSERT) { funcWrapper.invoke(rowData); } - // this returns early cause there is no inflight instant + // this returns early because there is no inflight instant funcWrapper.checkpointFunction(2); // do not sent the write event and fails the checkpoint, // behaves like the last checkpoint is successful. @@ -501,16 +499,11 @@ public class TestWriteCopyOnWrite { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); - assertFalse(funcWrapper.isAllPartitionsLoaded(), - "All partitions assume to be loaded into the index state"); + funcWrapper.checkpointComplete(2); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); - // next element triggers all partitions load check - funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0)); - assertTrue(funcWrapper.isAllPartitionsLoaded(), - "All partitions assume to be loaded into the index state"); } // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 04cee442a..33457b509 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -146,6 +146,55 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); } + /** + * Test that only partial small files are assigned to the task. + */ + @Test + public void testInsertWithPartialSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1, f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + bucketInfo = mockBucketAssigner.addInsert("par3"); + assertBucketEquals(bucketInfo, "par3", BucketType.INSERT); + + MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); + BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + bucketInfo2 = mockBucketAssigner2.addInsert("par3"); + assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); + + bucketInfo2 = mockBucketAssigner2.addInsert("par3"); + assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT); + } + @Test public void testUpdateAndInsertWithSmallFiles() { SmallFile f0 = new SmallFile(); @@ -187,6 +236,60 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2"); } + /** + * Test that only partial small files are assigned to the task. + */ + @Test + public void testUpdateAndInsertWithPartialSmallFiles() { + SmallFile f0 = new SmallFile(); + f0.location = new HoodieRecordLocation("t0", "f0"); + f0.sizeBytes = 12; + + SmallFile f1 = new SmallFile(); + f1.location = new HoodieRecordLocation("t0", "f1"); + f1.sizeBytes = 122879; // no left space to append new records to this bucket + + SmallFile f2 = new SmallFile(); + f2.location = new HoodieRecordLocation("t0", "f2"); + f2.sizeBytes = 56; + + Map> smallFilesMap = new HashMap<>(); + smallFilesMap.put("par1", Arrays.asList(f0, f1, f2)); + + MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2, context, writeConfig, smallFilesMap); + mockBucketAssigner.addUpdate("par1", "f0"); + + BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + mockBucketAssigner.addUpdate("par1", "f2"); + + mockBucketAssigner.addInsert("par1"); + bucketInfo = mockBucketAssigner.addInsert("par1"); + assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0"); + + + MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2, context, writeConfig, smallFilesMap); + mockBucketAssigner2.addUpdate("par1", "f0"); + + BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + + mockBucketAssigner2.addUpdate("par1", "f2"); + + mockBucketAssigner2.addInsert("par1"); + bucketInfo2 = mockBucketAssigner2.addInsert("par1"); + assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); + } + private void assertBucketEquals( BucketInfo bucketInfo, String partition, @@ -220,7 +323,16 @@ public class TestBucketAssigner { HoodieFlinkEngineContext context, HoodieWriteConfig config, Map> smallFilesMap) { - super(context, config); + this(0, 1, context, config, smallFilesMap); + } + + MockBucketAssigner( + int taskID, + int numTasks, + HoodieFlinkEngineContext context, + HoodieWriteConfig config, + Map> smallFilesMap) { + super(taskID, numTasks, context, config); this.smallFilesMap = smallFilesMap; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index e8796ec72..fe3dd818e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -28,7 +28,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.Output; @@ -78,6 +80,9 @@ public class CompactFunctionWrapper { compactFunction = new CompactFunction(conf); compactFunction.setRuntimeContext(runtimeContext); compactFunction.open(conf); + final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor( + new MockOperatorCoordinatorContext(new OperatorID(), 1)); + compactFunction.setExecutor(syncExecutor); commitSink = new CompactionCommitSink(conf); commitSink.setRuntimeContext(runtimeContext); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java new file mode 100644 index 000000000..099dfd63f --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A mock {@link CoordinatorExecutor} that executes the actions synchronously. + */ +public class MockCoordinatorExecutor extends CoordinatorExecutor { + private static final Logger LOG = LoggerFactory.getLogger(MockCoordinatorExecutor.class); + + public MockCoordinatorExecutor(OperatorCoordinator.Context context) { + super(context, LOG); + } + + @Override + public void execute(ThrowingRunnable action, String actionName, Object... actionParams) { + final String actionString = String.format(actionName, actionParams); + try { + action.run(); + LOG.info("Executor executes action [{}] success!", actionString); + } catch (Throwable t) { + // if we have a JVM critical error, promote it immediately, there is a good + // chance the + // logging or job failing will not succeed any more + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + exceptionHook(actionString, t); + } + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 72f2e89a6..a4b6c16a3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -33,7 +33,9 @@ import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; @@ -57,6 +59,7 @@ public class StreamWriteFunctionWrapper { private final IOManager ioManager; private final StreamingRuntimeContext runtimeContext; private final MockOperatorEventGateway gateway; + private final MockOperatorCoordinatorContext coordinatorContext; private final StreamWriteOperatorCoordinator coordinator; private final MockFunctionInitializationContext functionInitializationContext; @@ -84,13 +87,15 @@ public class StreamWriteFunctionWrapper { this.gateway = new MockOperatorEventGateway(); this.conf = conf; // one function - this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); + this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); + this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); } public void openFunction() throws Exception { this.coordinator.start(); + this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); @@ -181,6 +186,10 @@ public class StreamWriteFunctionWrapper { return coordinator; } + public MockOperatorCoordinatorContext getCoordinatorContext() { + return coordinatorContext; + } + public void clearIndexState() { this.bucketAssignerFunction.clearIndexState(); } @@ -188,8 +197,4 @@ public class StreamWriteFunctionWrapper { public boolean isKeyInState(HoodieKey hoodieKey) { return this.bucketAssignerFunction.isKeyInState(hoodieKey); } - - public boolean isAllPartitionsLoaded() { - return this.bucketAssignerFunction.isAllPartitionsLoaded(); - } }