1
0

[HUDI-2126] The coordinator send events to write function when there are no data for the checkpoint (#3219)

This commit is contained in:
Danny Chan
2021-07-05 11:34:18 +08:00
committed by GitHub
parent 6a71412f78
commit 98ec017bc8
4 changed files with 78 additions and 10 deletions

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
@@ -41,6 +42,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -346,6 +348,12 @@ public class StreamWriteFunction<K, I, O>
writeStatuses.clear();
}
public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent");
this.confirming = false;
}
/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
@@ -558,14 +566,14 @@ public class StreamWriteFunction<K, I, O>
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
if (confirming) {
long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L;
long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L;
while (confirming) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
while (instant == null || (instant.equals(this.currentInstant) && hasData)) {
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
@@ -578,10 +586,11 @@ public class StreamWriteFunction<K, I, O>
}
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
} else {
// the inflight instant changed, which means the last instant was committed
// successfully.
confirming = false;
}
// the inflight instant changed, which means the last instant was committed
// successfully.
confirming = false;
}
return instant;
}

View File

@@ -42,8 +42,8 @@ public class StreamWriteOperator<I>
}
@Override
public void handleOperatorEvent(OperatorEvent operatorEvent) {
// do nothing
public void handleOperatorEvent(OperatorEvent event) {
this.sinkFunction.handleOperatorEvent(event);
}
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.CoordinatorExecutor;
import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -40,6 +41,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +58,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -353,6 +356,26 @@ public class StreamWriteOperatorCoordinator
addEventToBuffer(event);
}
/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents() {
CompletableFuture<?>[] futures = IntStream.range(0, this.parallelism)
.mapToObj(taskID -> {
try {
return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
} catch (TaskNotRunningException e) {
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e);
}
}).toArray(CompletableFuture<?>[]::new);
try {
CompletableFuture.allOf(futures).get();
} catch (Exception e) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", e);
}
}
/**
* Commits the instant.
*
@@ -373,6 +396,8 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents();
return false;
}
doCommit(instant, writeResults);

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.event;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
/**
* An operator event to mark successful instant commit.
*/
public class CommitAckEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
private static final CommitAckEvent INSTANCE = new CommitAckEvent();
public static CommitAckEvent getInstance() {
return INSTANCE;
}
}