From 37b7c65d8a3ede00ae16909a06e31c24f179998c Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Tue, 29 Jun 2021 08:53:52 +0800 Subject: [PATCH] [HUDI-2084] Resend the uncommitted write metadata when start up (#3168) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../hudi/client/HoodieFlinkWriteClient.java | 26 +-- .../org/apache/hudi/util/FlinkClientUtil.java | 9 + .../apache/hudi/sink/StreamWriteFunction.java | 155 +++++++++++---- .../apache/hudi/sink/StreamWriteOperator.java | 2 +- .../sink/StreamWriteOperatorCoordinator.java | 176 ++++++++++++------ .../sink/compact/HoodieFlinkCompactor.java | 2 +- ...cessEvent.java => WriteMetadataEvent.java} | 41 +++- .../org/apache/hudi/util/CompactionUtil.java | 11 +- .../org/apache/hudi/util/StreamerUtil.java | 14 +- .../apache/hudi/sink/StreamWriteITCase.java | 2 +- .../TestStreamWriteOperatorCoordinator.java | 43 ++++- .../hudi/sink/TestWriteCopyOnWrite.java | 42 +++-- .../utils/StreamWriteFunctionWrapper.java | 30 ++- 13 files changed, 387 insertions(+), 166 deletions(-) rename hudi-flink/src/main/java/org/apache/hudi/sink/event/{BatchWriteSuccessEvent.java => WriteMetadataEvent.java} (81%) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index aa930f7b0..05e4481ec 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -58,6 +58,7 @@ import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; +import org.apache.hudi.util.FlinkClientUtil; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; @@ -174,7 +175,7 @@ public class HoodieFlinkWriteClient extends /** * Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table. * - * @param records HoodieRecords to insert + * @param records HoodieRecords to insert * @param instantTime Instant time of the commit * @return list of WriteStatus to inspect errors and counts */ @@ -194,7 +195,7 @@ public class HoodieFlinkWriteClient extends /** * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. * - * @param records HoodieRecords to insert + * @param records HoodieRecords to insert * @param instantTime Instant time of the commit * @return list of WriteStatus to inspect errors and counts */ @@ -235,7 +236,7 @@ public class HoodieFlinkWriteClient extends HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); - HoodieWriteMetadata> result = table.delete(context,instantTime, keys); + HoodieWriteMetadata> result = table.delete(context, instantTime, keys); return postWrite(result, instantTime, table); } @@ -391,11 +392,11 @@ public class HoodieFlinkWriteClient extends /** * Get or create a new write handle in order to reuse the file handles. * - * @param record The first record in the bucket - * @param config Write config - * @param instantTime The instant time - * @param table The table - * @param recordItr Record iterator + * @param record The first record in the bucket + * @param config Write config + * @param instantTime The instant time + * @param table The table + * @param recordItr Record iterator * @return Existing write handle or create a new one */ private HoodieWriteHandle getOrCreateWriteHandle( @@ -454,7 +455,8 @@ public class HoodieFlinkWriteClient extends } public String getLastPendingInstant(String actionType) { - HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); + HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath) + .getCommitsTimeline().filterInflightsAndRequested(); return unCompletedTimeline.getInstants() .filter(x -> x.getAction().equals(actionType)) .map(HoodieInstant::getTimestamp) @@ -465,7 +467,8 @@ public class HoodieFlinkWriteClient extends public String getLastCompletedInstant(HoodieTableType tableType) { final String commitType = CommitUtils.getCommitActionType(tableType); - HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath) + .getCommitsTimeline().filterCompletedInstants(); return completedTimeline.getInstants() .filter(x -> x.getAction().equals(commitType)) .map(HoodieInstant::getTimestamp) @@ -475,8 +478,7 @@ public class HoodieFlinkWriteClient extends } public void transitionRequestedToInflight(String commitType, String inFlightInstant) { - HoodieFlinkTable table = getHoodieTable(); - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline(); HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); activeTimeline.transitionRequestedToInflight(requested, Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java index 4112e2b52..65daf782f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.hudi.common.table.HoodieTableMetaClient; + import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; @@ -29,6 +31,13 @@ import java.io.File; */ public class FlinkClientUtil { + /** + * Creates the meta client. + */ + public static HoodieTableMetaClient createMetaClient(String basePath) { + return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build(); + } + /** * Parses the file name from path. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 9b1f4d1b2..8edf02cf0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -32,11 +32,14 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -54,6 +57,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; @@ -76,20 +80,18 @@ import java.util.stream.Collectors; * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. * - *

In order to improve the throughput, The function process thread does not block data buffering - * after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint - * batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records - * (e.g. the eager write batch), the semantics is still correct using the UPSERT operation. + *

The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until + * the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing, + * when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata + * can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job + * recovers. * *

Fault Tolerance

* - *

The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully. - * The operator rolls back the written data and throws to trigger a failover when any error occurs. - * This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped). - * If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last - * step of the #flushBuffer method). - * - *

The operator coordinator would try several times when committing the write status. + *

The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully. + * It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint, + * the write function blocks data buffer flushing for the configured checkpoint timeout + * before it throws exception, any checkpoint failure would finally trigger the job failure. * *

Note: The function task requires the input stream be shuffled by the file IDs. * @@ -162,6 +164,16 @@ public class StreamWriteFunction */ private volatile boolean confirming = false; + /** + * List state of the write metadata events. + */ + private transient ListState writeMetadataState; + + /** + * Write status list for the current checkpoint. + */ + private List writeStatuses; + /** * Constructs a StreamingSinkFunction. * @@ -173,27 +185,43 @@ public class StreamWriteFunction @Override public void open(Configuration parameters) throws IOException { - this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.actionType = CommitUtils.getCommitActionType( - WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), - HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); this.tracer = new TotalSizeTracer(this.config); initBuffer(); initWriteFunction(); } @Override - public void initializeState(FunctionInitializationContext context) { - // no operation + public void initializeState(FunctionInitializationContext context) throws Exception { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.actionType = CommitUtils.getCommitActionType( + WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), + HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); + + this.writeStatuses = new ArrayList<>(); + this.writeMetadataState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>( + "write-metadata-state", + TypeInformation.of(WriteMetadataEvent.class) + )); + + if (context.isRestored()) { + restoreWriteMetadata(); + } else { + sendBootstrapEvent(); + } + // blocks flushing until the coordinator starts a new instant + this.confirming = true; } @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { // Based on the fact that the coordinator starts the checkpoint first, // it would check the validity. // wait for the buffer data flush out and request a new instant flushRemaining(false); + // Reload the snapshot state as the current state. + reloadWriteMetaState(); } @Override @@ -215,6 +243,7 @@ public class StreamWriteFunction public void endInput() { flushRemaining(true); this.writeClient.cleanHandles(); + this.writeStatuses.clear(); } // ------------------------------------------------------------------------- @@ -274,6 +303,49 @@ public class StreamWriteFunction } } + private void restoreWriteMetadata() throws Exception { + String lastInflight = this.writeClient.getLastPendingInstant(this.actionType); + boolean eventSent = false; + for (WriteMetadataEvent event : this.writeMetadataState.get()) { + if (Objects.equals(lastInflight, event.getInstantTime())) { + // The checkpoint succeed but the meta does not commit, + // re-commit the inflight instant + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); + eventSent = true; + } + } + if (!eventSent) { + sendBootstrapEvent(); + } + } + + private void sendBootstrapEvent() { + WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .writeStatus(Collections.emptyList()) + .instantTime("") + .isBootstrap(true) + .build(); + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); + } + + /** + * Reload the write metadata state as the current checkpoint. + */ + private void reloadWriteMetaState() throws Exception { + this.writeMetadataState.clear(); + WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(currentInstant) + .writeStatus(new ArrayList<>(writeStatuses)) + .isBootstrap(true) + .build(); + this.writeMetadataState.add(event); + writeStatuses.clear(); + } + /** * Represents a data item in the buffer, this is needed to reduce the * memory footprint. @@ -477,23 +549,23 @@ public class StreamWriteFunction bucket.records.add(item); } - @SuppressWarnings("unchecked, rawtypes") - private boolean flushBucket(DataBucket bucket) { + private boolean hasData() { + return this.buckets.size() > 0 + && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); + } + + private String instantToWrite() { String instant = this.writeClient.getLastPendingInstant(this.actionType); - - if (instant == null) { - // in case there are empty checkpoints that has no input data - LOG.info("No inflight instant when flushing data, skip."); - return false; - } - // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. if (confirming) { long waitingTime = 0L; long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); long interval = 500L; - while (instant == null || instant.equals(this.currentInstant)) { + // wait condition: + // 1. there is no inflight instant + // 2. the inflight instant does not change and the checkpoint has buffering data + while (instant == null || (instant.equals(this.currentInstant) && hasData())) { // sleep for a while try { if (waitingTime > ckpTimeout) { @@ -511,6 +583,18 @@ public class StreamWriteFunction // successfully. confirming = false; } + return instant; + } + + @SuppressWarnings("unchecked, rawtypes") + private boolean flushBucket(DataBucket bucket) { + String instant = instantToWrite(); + + if (instant == null) { + // in case there are empty checkpoints that has no input data + LOG.info("No inflight instant when flushing data, skip."); + return false; + } List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); @@ -520,20 +604,22 @@ public class StreamWriteFunction bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); records.clear(); - final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() + final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. .writeStatus(writeStatus) .isLastBatch(false) .isEndInput(false) .build(); + this.eventGateway.sendEventToCoordinator(event); + writeStatuses.addAll(writeStatus); return true; } @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean isEndInput) { - this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); + this.currentInstant = instantToWrite(); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); @@ -560,17 +646,20 @@ public class StreamWriteFunction LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); } - final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() + final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) .instantTime(currentInstant) .writeStatus(writeStatus) .isLastBatch(true) .isEndInput(isEndInput) .build(); + this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); this.writeClient.cleanHandles(); + this.writeStatuses.addAll(writeStatus); + // blocks flushing until the coordinator starts a new instant this.confirming = true; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java index 3150d065a..b0f8328c1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java @@ -51,7 +51,7 @@ public class StreamWriteOperator } @Override - public void endInput() throws Exception { + public void endInput() { sinkFunction.endInput(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 7f6f8163c..84f3c0b1e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -29,7 +29,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; @@ -97,7 +97,7 @@ public class StreamWriteOperatorCoordinator * Event buffer for one round of checkpointing. When all the elements are non-null and have the same * write instant, then the instant succeed and we can commit it. */ - private transient BatchWriteSuccessEvent[] eventBuffer; + private transient WriteMetadataEvent[] eventBuffer; /** * Task number of the operator. @@ -152,8 +152,6 @@ public class StreamWriteOperatorCoordinator this.tableState = TableState.create(conf); // init table, create it if not exists. initTableIfNotExists(this.conf); - // start a new instant - startInstant(); // start the executor this.executor = new CoordinatorExecutor(this.context, LOG); // start the executor if required @@ -201,7 +199,7 @@ public class StreamWriteOperatorCoordinator // for streaming mode, commits the ever received events anyway, // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) - final boolean committed = commitInstant(); + final boolean committed = commitInstant(this.instant); if (committed) { // if async compaction is on, schedule the compaction if (asyncCompaction) { @@ -216,30 +214,8 @@ public class StreamWriteOperatorCoordinator ); } - private void syncHiveIfEnabled() { - if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { - this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); - } - } - - /** - * Sync hoodie table metadata to Hive metastore. - */ - public void syncHive() { - hiveSyncContext.hiveSyncTool().syncHoodieTable(); - } - - private void startInstant() { - final String instant = HoodieActiveTimeline.createNewInstantTime(); - this.writeClient.startCommitWithTime(instant, tableState.commitAction); - this.instant = instant; - this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); - LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, - this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); - } - @Override - public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) { + public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation } @@ -248,27 +224,17 @@ public class StreamWriteOperatorCoordinator executor.execute( () -> { // no event to handle - ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, - "The coordinator can only handle BatchWriteSuccessEvent"); - BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - // the write task does not block after checkpointing(and before it receives a checkpoint success event), - // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint - // success event, the data buffer would flush with an older instant time. - ValidationUtils.checkState( - HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), - String.format("Receive an unexpected event for instant %s from task %d", - event.getInstantTime(), event.getTaskID())); - if (this.eventBuffer[event.getTaskID()] != null) { - this.eventBuffer[event.getTaskID()].mergeWith(event); + ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, + "The coordinator can only handle WriteMetaEvent"); + WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent; + if (event.isBootstrap()) { + handleBootstrapEvent(event); + } else if (event.isEndInput()) { + handleEndInputEvent(event); } else { - this.eventBuffer[event.getTaskID()] = event; + handleWriteMetaEvent(event); } - if (event.isEndInput() && allEventsReceived()) { - // start to commit the instant. - commitInstant(); - // no compaction scheduling for batch mode - } - }, "handle write success event for instant %s", this.instant + }, "handle write metadata event for instant %s", this.instant ); } @@ -291,22 +257,108 @@ public class StreamWriteOperatorCoordinator this.hiveSyncContext = HiveSyncContext.create(conf); } - private void reset() { - this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; + private void syncHiveIfEnabled() { + if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) { + this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant); + } } - /** Checks the buffer is ready to commit. */ + /** + * Sync hoodie table metadata to Hive metastore. + */ + public void syncHive() { + hiveSyncContext.hiveSyncTool().syncHoodieTable(); + } + + private void reset() { + this.eventBuffer = new WriteMetadataEvent[this.parallelism]; + } + + /** + * Checks the buffer is ready to commit. + */ private boolean allEventsReceived() { return Arrays.stream(eventBuffer) .allMatch(event -> event != null && event.isReady(this.instant)); } + private void addEventToBuffer(WriteMetadataEvent event) { + if (this.eventBuffer[event.getTaskID()] != null) { + this.eventBuffer[event.getTaskID()].mergeWith(event); + } else { + this.eventBuffer[event.getTaskID()] = event; + } + } + + private void startInstant() { + final String instant = HoodieActiveTimeline.createNewInstantTime(); + this.writeClient.startCommitWithTime(instant, tableState.commitAction); + this.instant = instant; + this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); + LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, + this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); + } + + /** + * Initializes the instant. + * + *

Recommits the last inflight instant if the write metadata checkpoint successfully + * but was not committed due to some rare cases. + * + *

Starts a new instant, a writer can not flush data buffer + * until it finds a new inflight instant on the timeline. + */ + private void initInstant(String instant) { + HoodieTimeline completedTimeline = + StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants(); + executor.execute(() -> { + if (instant.equals("") || completedTimeline.containsInstant(instant)) { + // the last instant committed successfully + reset(); + } else { + LOG.info("Recommit instant {}", instant); + commitInstant(instant); + } + // starts a new instant + startInstant(); + }, "initialize instant %s", instant); + } + + private void handleBootstrapEvent(WriteMetadataEvent event) { + addEventToBuffer(event); + if (Arrays.stream(eventBuffer).allMatch(Objects::nonNull)) { + // start to initialize the instant. + initInstant(event.getInstantTime()); + } + } + + private void handleEndInputEvent(WriteMetadataEvent event) { + addEventToBuffer(event); + if (allEventsReceived()) { + // start to commit the instant. + commitInstant(this.instant); + // no compaction scheduling for batch mode + } + } + + private void handleWriteMetaEvent(WriteMetadataEvent event) { + // the write task does not block after checkpointing(and before it receives a checkpoint success event), + // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint + // success event, the data buffer would flush with an older instant time. + ValidationUtils.checkState( + HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()), + String.format("Receive an unexpected event for instant %s from task %d", + event.getInstantTime(), event.getTaskID())); + + addEventToBuffer(event); + } + /** * Commits the instant. * * @return true if the write statuses are committed successfully. */ - private boolean commitInstant() { + private boolean commitInstant(String instant) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { // The last checkpoint finished successfully. return false; @@ -314,7 +366,7 @@ public class StreamWriteOperatorCoordinator List writeResults = Arrays.stream(eventBuffer) .filter(Objects::nonNull) - .map(BatchWriteSuccessEvent::getWriteStatuses) + .map(WriteMetadataEvent::getWriteStatuses) .flatMap(Collection::stream) .collect(Collectors.toList()); @@ -323,13 +375,15 @@ public class StreamWriteOperatorCoordinator reset(); return false; } - doCommit(writeResults); + doCommit(instant, writeResults); return true; } - /** Performs the actual commit action. */ + /** + * Performs the actual commit action. + */ @SuppressWarnings("unchecked") - private void doCommit(List writeResults) { + private void doCommit(String instant, List writeResults) { // commit or rollback long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); @@ -345,13 +399,13 @@ public class StreamWriteOperatorCoordinator final Map> partitionToReplacedFileIds = tableState.isOverwrite ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults) : Collections.emptyMap(); - boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata), + boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata), tableState.commitAction, partitionToReplacedFileIds); if (success) { reset(); - LOG.info("Commit instant [{}] success!", this.instant); + LOG.info("Commit instant [{}] success!", instant); } else { - throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant)); + throw new HoodieException(String.format("Commit instant [%s] failed!", instant)); } } else { LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); @@ -364,13 +418,13 @@ public class StreamWriteOperatorCoordinator } }); // Rolls back instant - writeClient.rollback(this.instant); - throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant)); + writeClient.rollback(instant); + throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant)); } } @VisibleForTesting - public BatchWriteSuccessEvent[] getEventBuffer() { + public WriteMetadataEvent[] getEventBuffer() { return eventBuffer; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index f95f3e3ae..289d4f67c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -58,7 +58,7 @@ public class HoodieFlinkCompactor { Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); // create metaClient - HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); // get the table name conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java similarity index 81% rename from hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index 186a470ea..662383b50 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -30,13 +30,14 @@ import java.util.Objects; /** * An operator event to mark successful checkpoint batch write. */ -public class BatchWriteSuccessEvent implements OperatorEvent { +public class WriteMetadataEvent implements OperatorEvent { private static final long serialVersionUID = 1L; private List writeStatuses; private final int taskID; private String instantTime; private boolean isLastBatch; + /** * Flag saying whether the event comes from the end of input, e.g. the source * is bounded, there are two cases in which this flag should be set to true: @@ -45,6 +46,11 @@ public class BatchWriteSuccessEvent implements OperatorEvent { */ private final boolean isEndInput; + /** + * Flag saying whether the event comes from bootstrap of a write function. + */ + private final boolean isBootstrap; + /** * Creates an event. * @@ -55,22 +61,25 @@ public class BatchWriteSuccessEvent implements OperatorEvent { * within an checkpoint interval, * if true, the whole data set of the checkpoint * has been flushed successfully + * @param isBootstrap Whether the event comes from the bootstrap */ - private BatchWriteSuccessEvent( + private WriteMetadataEvent( int taskID, String instantTime, List writeStatuses, boolean isLastBatch, - boolean isEndInput) { + boolean isEndInput, + boolean isBootstrap) { this.taskID = taskID; this.instantTime = instantTime; this.writeStatuses = new ArrayList<>(writeStatuses); this.isLastBatch = isLastBatch; this.isEndInput = isEndInput; + this.isBootstrap = isBootstrap; } /** - * Returns the builder for {@link BatchWriteSuccessEvent}. + * Returns the builder for {@link WriteMetadataEvent}. */ public static Builder builder() { return new Builder(); @@ -96,12 +105,16 @@ public class BatchWriteSuccessEvent implements OperatorEvent { return isEndInput; } + public boolean isBootstrap() { + return isBootstrap; + } + /** - * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}. + * Merges this event with given {@link WriteMetadataEvent} {@code other}. * * @param other The event to be merged */ - public void mergeWith(BatchWriteSuccessEvent other) { + public void mergeWith(WriteMetadataEvent other) { ValidationUtils.checkArgument(this.taskID == other.taskID); // the instant time could be monotonically increasing this.instantTime = other.instantTime; @@ -112,7 +125,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent { this.writeStatuses = statusList; } - /** Returns whether the event is ready to commit. */ + /** + * Returns whether the event is ready to commit. + */ public boolean isReady(String currentInstant) { return isLastBatch && this.instantTime.equals(currentInstant); } @@ -122,7 +137,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent { // ------------------------------------------------------------------------- /** - * Builder for {@link BatchWriteSuccessEvent}. + * Builder for {@link WriteMetadataEvent}. */ public static class Builder { private List writeStatus; @@ -130,12 +145,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private String instantTime; private boolean isLastBatch = false; private boolean isEndInput = false; + private boolean isBootstrap = false; - public BatchWriteSuccessEvent build() { + public WriteMetadataEvent build() { Objects.requireNonNull(taskID); Objects.requireNonNull(instantTime); Objects.requireNonNull(writeStatus); - return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput); + return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap); } public Builder taskID(int taskID) { @@ -162,5 +178,10 @@ public class BatchWriteSuccessEvent implements OperatorEvent { this.isEndInput = isEndInput; return this; } + + public Builder isBootstrap(boolean isBootstrap) { + this.isBootstrap = isBootstrap; + return this; + } } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 3d98ce9f3..e8927dc7f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -26,11 +26,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieFlinkTable; import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hudi.table.HoodieFlinkTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +43,6 @@ public class CompactionUtil { private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class); - /** - * Creates the metaClient. - */ - public static HoodieTableMetaClient createMetaClient(Configuration conf) { - return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build(); - } - /** * Gets compaction Instant time. */ @@ -72,7 +65,7 @@ public class CompactionUtil { * Sets up the avro schema string into the give configuration {@code conf} * through reading from the hoodie table metadata. * - * @param conf The configuration + * @param conf The configuration */ public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index e20f34fa9..d73b300b2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -154,7 +154,7 @@ public class StreamerUtil { .withMaxMemoryMaxSize( conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L - ).build()) + ).build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withStorageConfig(HoodieStorageConfig.newBuilder() .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) @@ -221,13 +221,16 @@ public class StreamerUtil { // some of the filesystems release the handles in #close method. } - /** Generates the bucket ID using format {partition path}_{fileID}. */ + /** + * Generates the bucket ID using format {partition path}_{fileID}. + */ public static String generateBucketKey(String partitionPath, String fileId) { return String.format("%s_%s", partitionPath, fileId); } /** * Returns whether needs to schedule the async compaction. + * * @param conf The flink configuration. */ public static boolean needsAsyncCompaction(Configuration conf) { @@ -237,6 +240,13 @@ public class StreamerUtil { && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); } + /** + * Creates the meta client. + */ + public static HoodieTableMetaClient createMetaClient(Configuration conf) { + return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build(); + } + /** * Creates the Flink write client. */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 8e67159a4..be4c05219 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -200,7 +200,7 @@ public class StreamWriteITCase extends TestLogger { conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); // create metaClient - HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); // set the table name conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index a2fdf227f..612bb7966 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -70,6 +70,23 @@ public class TestStreamWriteOperatorCoordinator { TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); coordinator.start(); coordinator.setExecutor(new MockCoordinatorExecutor(context)); + + final WriteMetadataEvent event0 = WriteMetadataEvent.builder() + .taskID(0) + .instantTime("") + .writeStatus(Collections.emptyList()) + .isBootstrap(true) + .build(); + + final WriteMetadataEvent event1 = WriteMetadataEvent.builder() + .taskID(1) + .instantTime("") + .writeStatus(Collections.emptyList()) + .isBootstrap(true) + .build(); + + coordinator.handleEventFromOperator(0, event0); + coordinator.handleEventFromOperator(1, event1); } @AfterEach @@ -85,7 +102,7 @@ public class TestStreamWriteOperatorCoordinator { WriteStatus writeStatus = new WriteStatus(true, 0.1D); writeStatus.setPartitionPath("par1"); writeStatus.setStat(new HoodieWriteStat()); - OperatorEvent event0 = BatchWriteSuccessEvent.builder() + OperatorEvent event0 = WriteMetadataEvent.builder() .taskID(0) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus)) @@ -95,7 +112,7 @@ public class TestStreamWriteOperatorCoordinator { WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); writeStatus1.setPartitionPath("par2"); writeStatus1.setStat(new HoodieWriteStat()); - OperatorEvent event1 = BatchWriteSuccessEvent.builder() + OperatorEvent event1 = WriteMetadataEvent.builder() .taskID(1) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus1)) @@ -132,7 +149,7 @@ public class TestStreamWriteOperatorCoordinator { public void testReceiveInvalidEvent() { CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - OperatorEvent event = BatchWriteSuccessEvent.builder() + OperatorEvent event = WriteMetadataEvent.builder() .taskID(0) .instantTime("abc") .writeStatus(Collections.emptyList()) @@ -147,7 +164,7 @@ public class TestStreamWriteOperatorCoordinator { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); String instant = coordinator.getInstant(); - OperatorEvent event = BatchWriteSuccessEvent.builder() + OperatorEvent event = WriteMetadataEvent.builder() .taskID(0) .instantTime(instant) .writeStatus(Collections.emptyList()) @@ -163,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator { WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); writeStatus1.setPartitionPath("par2"); writeStatus1.setStat(new HoodieWriteStat()); - OperatorEvent event1 = BatchWriteSuccessEvent.builder() + OperatorEvent event1 = WriteMetadataEvent.builder() .taskID(1) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus1)) @@ -186,20 +203,30 @@ public class TestStreamWriteOperatorCoordinator { coordinator.start(); coordinator.setExecutor(new MockCoordinatorExecutor(context)); + final WriteMetadataEvent event0 = WriteMetadataEvent.builder() + .taskID(0) + .instantTime("") + .writeStatus(Collections.emptyList()) + .isBootstrap(true) + .build(); + + coordinator.handleEventFromOperator(0, event0); + String instant = coordinator.getInstant(); assertNotEquals("", instant); WriteStatus writeStatus = new WriteStatus(true, 0.1D); writeStatus.setPartitionPath("par1"); writeStatus.setStat(new HoodieWriteStat()); - OperatorEvent event0 = BatchWriteSuccessEvent.builder() + + OperatorEvent event1 = WriteMetadataEvent.builder() .taskID(0) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus)) .isLastBatch(true) .build(); - coordinator.handleEventFromOperator(0, event0); + coordinator.handleEventFromOperator(0, event1); // never throw for hive synchronization now assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index e1cb99e1c..90a3b344b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -135,8 +135,8 @@ public class TestWriteCopyOnWrite { String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - List writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); + MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); + List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); assertNotNull(writeStatuses); MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files assertThat(writeStatuses.stream() @@ -162,8 +162,8 @@ public class TestWriteCopyOnWrite { assertNotEquals(instant, instant2); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class)); - List writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses(); + assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class)); + List writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses(); assertNotNull(writeStatuses2); assertThat(writeStatuses2.size(), is(0)); // write empty statuses @@ -191,8 +191,8 @@ public class TestWriteCopyOnWrite { assertNotNull(instant); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - List writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); + List writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses(); assertNotNull(writeStatuses); assertThat(writeStatuses.size(), is(0)); // no data write @@ -210,7 +210,9 @@ public class TestWriteCopyOnWrite { } // this returns early because there is no inflight instant - funcWrapper.checkpointFunction(2); + assertThrows(HoodieException.class, + () -> funcWrapper.checkpointFunction(2), + "Timeout(0ms) while waiting for"); // do not sent the write event and fails the checkpoint, // behaves like the last checkpoint is successful. funcWrapper.checkpointFails(2); @@ -232,7 +234,7 @@ public class TestWriteCopyOnWrite { .getLastPendingInstant(getTableType()); final OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -262,7 +264,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -298,7 +300,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -318,7 +320,7 @@ public class TestWriteCopyOnWrite { .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -343,7 +345,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -363,7 +365,7 @@ public class TestWriteCopyOnWrite { .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -408,7 +410,7 @@ public class TestWriteCopyOnWrite { final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event1); funcWrapper.getCoordinator().handleEventFromOperator(0, event2); @@ -470,7 +472,7 @@ public class TestWriteCopyOnWrite { final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event1); funcWrapper.getCoordinator().handleEventFromOperator(0, event2); @@ -534,7 +536,7 @@ public class TestWriteCopyOnWrite { for (int i = 0; i < 2; i++) { final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event); } assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -592,7 +594,7 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFunction(1); OperatorEvent nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); @@ -634,7 +636,7 @@ public class TestWriteCopyOnWrite { .getLastPendingInstant(getTableType()); nextEvent = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); checkWrittenData(tempFile, EXPECTED2); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); @@ -673,7 +675,7 @@ public class TestWriteCopyOnWrite { for (int i = 0; i < 2; i++) { final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first - assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); + assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class)); funcWrapper.getCoordinator().handleEventFromOperator(0, event); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 84ba9da91..e78456b0e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -27,7 +27,7 @@ import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bootstrap.IndexRecord; -import org.apache.hudi.sink.event.BatchWriteSuccessEvent; +import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; @@ -70,15 +70,25 @@ public class StreamWriteFunctionWrapper { private final StreamWriteOperatorCoordinator coordinator; private final MockFunctionInitializationContext functionInitializationContext; - /** Function that converts row data to HoodieRecord. */ + /** + * Function that converts row data to HoodieRecord. + */ private RowDataToHoodieFunction> toHoodieFunction; - /** Function that load index in state. */ + /** + * Function that load index in state. + */ private BootstrapFunction, HoodieRecord> bootstrapFunction; - /** Function that assigns bucket ID. */ + /** + * Function that assigns bucket ID. + */ private BucketAssignFunction, HoodieRecord> bucketAssignerFunction; - /** BucketAssignOperator context. **/ + /** + * BucketAssignOperator context. + **/ private MockBucketAssignOperatorContext bucketAssignOperatorContext; - /** Stream write function. */ + /** + * Stream write function. + */ private StreamWriteFunction, Object> writeFunction; private CompactFunctionWrapper compactFunctionWrapper; @@ -133,8 +143,12 @@ public class StreamWriteFunctionWrapper { writeFunction = new StreamWriteFunction<>(conf); writeFunction.setRuntimeContext(runtimeContext); writeFunction.setOperatorEventGateway(gateway); + writeFunction.initializeState(this.functionInitializationContext); writeFunction.open(conf); + // handle the bootstrap event + coordinator.handleEventFromOperator(0, getNextEvent()); + if (asyncCompaction) { compactFunctionWrapper.openFunction(); } @@ -184,7 +198,7 @@ public class StreamWriteFunctionWrapper { writeFunction.processElement(hoodieRecords[0], null, null); } - public BatchWriteSuccessEvent[] getEventBuffer() { + public WriteMetadataEvent[] getEventBuffer() { return this.coordinator.getEventBuffer(); } @@ -201,7 +215,7 @@ public class StreamWriteFunctionWrapper { return this.writeFunction.getWriteClient(); } - public void checkpointFunction(long checkpointId) { + public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); bucketAssignerFunction.snapshotState(null);