1
0

[HUDI-2469] [Kafka Connect] Replace json based payload with protobuf for Transaction protocol. (#3694)

* Substitue Control Event with protobuf

* Fix tests

* Fix unit tests

* Add javadocs

* Add javadocs

* Address reviewer comments

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
rmahindra123
2021-10-19 14:29:48 -07:00
committed by GitHub
parent 46f0496a08
commit 3686c25fae
24 changed files with 354 additions and 461 deletions

View File

@@ -95,10 +95,10 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
done
# First delete the existing topic
#${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
# Create the topic with 4 partitions
#${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
# Setup the schema registry
export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)

View File

@@ -63,6 +63,25 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.1.0.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protoc.version}</protocVersion>
<inputDirectories>
<include>src/main/resources</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
@@ -138,6 +157,13 @@
</exclusions>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${proto.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -36,7 +37,6 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -51,7 +51,6 @@ public class HoodieSinkTask extends SinkTask {
public static final String TASK_ID_CONFIG_NAME = "task.id";
private static final Logger LOG = LogManager.getLogger(HoodieSinkTask.class);
private static final int COORDINATOR_KAFKA_PARTITION = 0;
private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
private final Map<TopicPartition, TransactionParticipant> transactionParticipants;
@@ -113,7 +112,7 @@ public class HoodieSinkTask extends SinkTask {
}
try {
transactionParticipants.get(partition).processRecords();
} catch (IOException exception) {
} catch (HoodieIOException exception) {
throw new RetriableException("Intermittent write errors for Hudi "
+ " for the topic/partition: " + partition.topic() + ":" + partition.partition()
+ " , ensuring kafka connect will retry ", exception);
@@ -164,7 +163,7 @@ public class HoodieSinkTask extends SinkTask {
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
for (TopicPartition partition : partitions) {
if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
if (partition.partition() == ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION) {
if (transactionCoordinators.containsKey(partition)) {
transactionCoordinators.get(partition).stop();
transactionCoordinators.remove(partition);
@@ -188,7 +187,7 @@ public class HoodieSinkTask extends SinkTask {
for (TopicPartition partition : partitions) {
try {
// If the partition is 0, instantiate the Leader
if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
if (partition.partition() == ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION) {
ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(
connectConfigs,
partition,

View File

@@ -18,17 +18,16 @@
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -66,7 +65,7 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
// List of TransactionParticipants per Kafka Topic
private final Map<String, ConcurrentLinkedQueue<TransactionParticipant>> partitionWorkers;
private final KafkaControlProducer producer;
private KafkaConsumer<String, ControlEvent> consumer;
private KafkaConsumer<String, byte[]> consumer;
public KafkaConnectControlAgent(String bootstrapServers,
String controlTopicName) {
@@ -118,7 +117,7 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
}
@Override
public void publishMessage(ControlEvent message) {
public void publishMessage(ControlMessage message) {
producer.publishMessage(message);
}
@@ -128,28 +127,28 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
// Todo fetch the worker id or name instead of a uuid.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hudi-control-group" + UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
// Since we are using Kafka Control Topic as a RPC like interface,
// we want consumers to only process messages that are sent after they come online
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumer = new KafkaConsumer<>(props, new StringDeserializer(),
new KafkaJsonDeserializer<>(ControlEvent.class));
consumer = new KafkaConsumer<>(props, new StringDeserializer(), new ByteArrayDeserializer());
consumer.subscribe(Collections.singletonList(controlTopicName));
executorService.submit(() -> {
while (true) {
ConsumerRecords<String, ControlEvent> records;
ConsumerRecords<String, byte[]> records;
records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
for (ConsumerRecord<String, ControlEvent> record : records) {
for (ConsumerRecord<String, byte[]> record : records) {
try {
LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s",
"", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
ControlEvent message = record.value();
String senderTopic = message.senderPartition().topic();
if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
ControlMessage message = ControlMessage.parseFrom(record.value());
String senderTopic = message.getTopicName();
if (message.getReceiverType().equals(ControlMessage.EntityType.PARTICIPANT)) {
if (partitionWorkers.containsKey(senderTopic)) {
for (TransactionParticipant partitionWorker : partitionWorkers.get(senderTopic)) {
partitionWorker.processControlEvent(message);
@@ -157,11 +156,9 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
} else {
LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", senderTopic));
}
} else if (message.getSenderType().equals(ControlEvent.SenderType.PARTICIPANT)) {
} else if (message.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
if (topicCoordinators.containsKey(senderTopic)) {
topicCoordinators.get(senderTopic).processControlEvent(message);
} else {
LOG.warn(String.format("Failed to send message for unregistered coordinator for topic %s", senderTopic));
}
} else {
LOG.warn(String.format("Sender type of Control Message unknown %s", message.getSenderType().name()));
@@ -200,31 +197,4 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
}
}
}
/**
* Deserializes the incoming Kafka records for the Control Topic.
*
* @param <T> represents the object that is sent over the Control Topic.
*/
public static class KafkaJsonDeserializer<T> implements Deserializer<T> {
private static final Logger LOG = LogManager.getLogger(KafkaJsonDeserializer.class);
private final Class<T> type;
KafkaJsonDeserializer(Class<T> type) {
this.type = type;
}
@Override
public T deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
T obj = null;
try {
obj = mapper.readValue(bytes, type);
} catch (Exception e) {
LOG.error(e.getMessage());
}
return obj;
}
}
}

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
@@ -37,5 +37,5 @@ public interface KafkaControlAgent {
void deregisterTransactionCoordinator(TransactionCoordinator coordinator);
void publishMessage(ControlEvent message);
void publishMessage(ControlMessage message);
}

View File

@@ -18,16 +18,13 @@
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.ControlMessage;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -45,7 +42,7 @@ public class KafkaControlProducer {
private final String bootstrapServers;
private final String controlTopicName;
private Producer<String, ControlEvent> producer;
private Producer<String, byte[]> producer;
public KafkaControlProducer(String bootstrapServers, String controlTopicName) {
this.bootstrapServers = bootstrapServers;
@@ -57,12 +54,12 @@ public class KafkaControlProducer {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producer = new KafkaProducer<>(
props,
new StringSerializer(),
new KafkaJsonSerializer()
new ByteArraySerializer()
);
}
@@ -70,28 +67,9 @@ public class KafkaControlProducer {
producer.close();
}
public void publishMessage(ControlEvent message) {
ProducerRecord<String, ControlEvent> record
= new ProducerRecord<>(controlTopicName, message.key(), message);
public void publishMessage(ControlMessage message) {
ProducerRecord<String, byte[]> record
= new ProducerRecord<>(controlTopicName, message.getType().name(), message.toByteArray());
producer.send(record);
}
public static class KafkaJsonSerializer implements Serializer<ControlEvent> {
private static final Logger LOG = LogManager.getLogger(KafkaJsonSerializer.class);
@Override
public byte[] serialize(String topic, ControlEvent data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
retVal = objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
LOG.error("Fatal error during serialization of Kafka Control Message ", e);
}
return retVal;
}
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
@@ -53,6 +54,8 @@ import java.util.stream.Collectors;
*/
public class ConnectTransactionCoordinator implements TransactionCoordinator, Runnable {
public static final int COORDINATOR_KAFKA_PARTITION = 0;
private static final Logger LOG = LogManager.getLogger(ConnectTransactionCoordinator.class);
private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
@@ -158,17 +161,18 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
}
@Override
public void processControlEvent(ControlEvent message) {
public void processControlEvent(ControlMessage message) {
CoordinatorEvent.CoordinatorEventType type;
if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
} else {
LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", message.getMsgType().name()));
LOG.warn(String.format("The Coordinator should not be receiving messages of type %s",
message.getType().name()));
return;
}
CoordinatorEvent event = new CoordinatorEvent(type,
message.senderPartition().topic(),
message.getTopicName(),
message.getCommitTime());
event.setMessage(message);
submitEvent(event);
@@ -242,15 +246,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
partitionsWriteStatusReceived.clear();
try {
currentCommitTime = transactionServices.startCommit();
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.START_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(
new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.START_COMMIT));
currentState = State.STARTED_COMMIT;
// schedule a timeout for ending the current commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT,
@@ -268,14 +264,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
private void endExistingCommit() {
try {
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.END_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.END_COMMIT));
} catch (Exception exception) {
LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
@@ -289,13 +278,11 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
configs.getCoordinatorWriteTimeoutSecs(), TimeUnit.SECONDS);
}
private void onReceiveWriteStatus(ControlEvent message) {
ControlEvent.ParticipantInfo participantInfo = message.getParticipantInfo();
if (participantInfo.getOutcomeType().equals(ControlEvent.OutcomeType.WRITE_SUCCESS)) {
int partition = message.senderPartition().partition();
partitionsWriteStatusReceived.put(partition, participantInfo.writeStatuses());
currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaCommitOffset());
}
private void onReceiveWriteStatus(ControlMessage message) {
ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo();
int partition = message.getSenderPartition();
partitionsWriteStatusReceived.put(partition, KafkaConnectUtils.getWriteStatuses(participantInfo));
currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaOffset());
if (partitionsWriteStatusReceived.size() >= numPartitions
&& currentState.equals(State.ENDED_COMMIT)) {
// Commit the kafka offsets to the commit file
@@ -311,7 +298,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
partition.topic(),
message.getTopicName(),
currentCommitTime));
} catch (Exception exception) {
LOG.error("Fatal error while committing file", exception);
@@ -334,15 +321,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
private void submitAckCommit() {
try {
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.ACK_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(
new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
} catch (Exception exception) {
LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
@@ -397,4 +376,20 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
public interface KafkaPartitionProvider {
int getLatestNumPartitions(String bootstrapServers, String topicName);
}
private ControlMessage buildControlMessage(ControlMessage.EventType eventType) {
return ControlMessage.newBuilder()
.setProtocolVersion(KafkaConnectConfigs.CURRENT_PROTOCOL_VERSION)
.setType(eventType)
.setTopicName(partition.topic())
.setSenderType(ControlMessage.EntityType.COORDINATOR)
.setSenderPartition(partition.partition())
.setReceiverType(ControlMessage.EntityType.PARTICIPANT)
.setCommitTime(currentCommitTime)
.setCoordinatorInfo(
ControlMessage.CoordinatorInfo.newBuilder()
.putAllGlobalKafkaCommitOffsets(globalCommittedKafkaOffsets)
.build()
).build();
}
}

View File

@@ -19,11 +19,14 @@
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -46,7 +49,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
private final LinkedList<SinkRecord> buffer;
private final BlockingQueue<ControlEvent> controlEvents;
private final BlockingQueue<ControlMessage> controlEvents;
private final TopicPartition partition;
private final SinkTaskContext context;
private final KafkaControlAgent kafkaControlAgent;
@@ -95,7 +98,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
@Override
public void processControlEvent(ControlEvent message) {
public void processControlEvent(ControlMessage message) {
controlEvents.add(message);
}
@@ -110,10 +113,10 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
@Override
public void processRecords() throws IOException {
public void processRecords() {
while (!controlEvents.isEmpty()) {
ControlEvent message = controlEvents.poll();
switch (message.getMsgType()) {
ControlMessage message = controlEvents.poll();
switch (message.getType()) {
case START_COMMIT:
handleStartCommit(message);
break;
@@ -127,14 +130,14 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
// ignore write status since its only processed by leader
break;
default:
throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getMsgType());
throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getType().name());
}
}
writeRecords();
}
private void handleStartCommit(ControlEvent message) {
private void handleStartCommit(ControlMessage message) {
// If there is an existing/ongoing transaction locally
// but it failed globally since we received another START_COMMIT instead of an END_COMMIT or ACK_COMMIT,
// so close it and start new transaction
@@ -152,7 +155,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
}
private void handleEndCommit(ControlEvent message) throws IOException {
private void handleEndCommit(ControlMessage message) {
if (ongoingTransactionInfo == null) {
LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
return;
@@ -172,21 +175,32 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
try {
//sendWriterStatus
List<WriteStatus> writeStatuses = ongoingTransactionInfo.getWriter().close();
ControlEvent writeStatusEvent = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
.setParticipantInfo(new ControlEvent.ParticipantInfo(
writeStatuses,
ongoingTransactionInfo.getLastWrittenKafkaOffset(),
ControlEvent.OutcomeType.WRITE_SUCCESS))
.build();
ControlMessage writeStatusEvent = ControlMessage.newBuilder()
.setProtocolVersion(KafkaConnectConfigs.CURRENT_PROTOCOL_VERSION)
.setType(ControlMessage.EventType.WRITE_STATUS)
.setTopicName(partition.topic())
.setSenderType(ControlMessage.EntityType.PARTICIPANT)
.setSenderPartition(partition.partition())
.setReceiverType(ControlMessage.EntityType.COORDINATOR)
.setReceiverPartition(ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION)
.setCommitTime(ongoingTransactionInfo.getCommitTime())
.setParticipantInfo(
ControlMessage.ParticipantInfo.newBuilder()
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses))
.setKafkaOffset(ongoingTransactionInfo.getLastWrittenKafkaOffset())
.build()
).build();
kafkaControlAgent.publishMessage(writeStatusEvent);
} catch (Exception exception) {
LOG.error(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
throw new IOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
throw new HoodieIOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()),
new IOException(exception));
}
}
private void handleAckCommit(ControlEvent message) {
private void handleAckCommit(ControlMessage message) {
// Update lastKafkCommitedOffset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset();
@@ -224,15 +238,15 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
try {
ongoingTransactionInfo.getWriter().close();
ongoingTransactionInfo = null;
} catch (IOException exception) {
} catch (HoodieIOException exception) {
LOG.warn("Error received while trying to cleanup existing transaction", exception);
}
}
}
private void syncKafkaOffsetWithLeader(ControlEvent message) {
if (message.getCoordinatorInfo() != null) {
Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsets().get(partition.partition());
private void syncKafkaOffsetWithLeader(ControlMessage message) {
if (message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().containsKey(partition.partition())) {
Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().get(partition.partition());
// Recover kafka committed offsets, treating the commit offset from the coordinator
// as the source of truth
if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {

View File

@@ -1,237 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
/**
* The events sent over the Kafka Control Topic between the
* coordinator and the followers, in order to ensure
* coordination across all the writes.
*/
@SuppressWarnings("checkstyle:VisibilityModifier")
public class ControlEvent implements Serializable {
private static final Logger LOG = LogManager.getLogger(ControlEvent.class);
private static final int CURRENT_VERSION = 0;
private final int version = CURRENT_VERSION;
private MsgType msgType;
private SenderType senderType;
private String commitTime;
private byte[] senderPartition;
private CoordinatorInfo coordinatorInfo;
private ParticipantInfo participantInfo;
public ControlEvent() {
}
public ControlEvent(MsgType msgType,
SenderType senderType,
String commitTime,
byte[] senderPartition,
CoordinatorInfo coordinatorInfo,
ParticipantInfo participantInfo) {
this.msgType = msgType;
this.senderType = senderType;
this.commitTime = commitTime;
this.senderPartition = senderPartition;
this.coordinatorInfo = coordinatorInfo;
this.participantInfo = participantInfo;
}
public String key() {
return msgType.name().toLowerCase(Locale.ROOT);
}
public MsgType getMsgType() {
return msgType;
}
public SenderType getSenderType() {
return senderType;
}
public String getCommitTime() {
return commitTime;
}
public byte[] getSenderPartition() {
return senderPartition;
}
public TopicPartition senderPartition() {
return SerializationUtils.deserialize(senderPartition);
}
public CoordinatorInfo getCoordinatorInfo() {
return coordinatorInfo;
}
public ParticipantInfo getParticipantInfo() {
return participantInfo;
}
public int getVersion() {
return version;
}
@Override
public String toString() {
return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
Arrays.toString(senderPartition),
(coordinatorInfo == null) ? "" : coordinatorInfo.toString(),
(participantInfo == null) ? "" : participantInfo.toString());
}
/**
* Builder that helps build {@link ControlEvent}.
*/
public static class Builder {
private final MsgType msgType;
private SenderType senderType;
private final String commitTime;
private final byte[] senderPartition;
private CoordinatorInfo coordinatorInfo;
private ParticipantInfo participantInfo;
public Builder(MsgType msgType, SenderType senderType, String commitTime, TopicPartition senderPartition) throws IOException {
this.msgType = msgType;
this.senderType = senderType;
this.commitTime = commitTime;
this.senderPartition = SerializationUtils.serialize(senderPartition);
}
public Builder setCoordinatorInfo(CoordinatorInfo coordinatorInfo) {
this.coordinatorInfo = coordinatorInfo;
return this;
}
public Builder setParticipantInfo(ParticipantInfo participantInfo) {
this.participantInfo = participantInfo;
return this;
}
public ControlEvent build() {
return new ControlEvent(msgType, senderType, commitTime, senderPartition, coordinatorInfo, participantInfo);
}
}
/**
* The info sent by the {@link TransactionCoordinator} to one or more
* {@link TransactionParticipant}s.
*/
public static class CoordinatorInfo implements Serializable {
private Map<Integer, Long> globalKafkaCommitOffsets;
public CoordinatorInfo() {
}
public CoordinatorInfo(Map<Integer, Long> globalKafkaCommitOffsets) {
this.globalKafkaCommitOffsets = globalKafkaCommitOffsets;
}
public Map<Integer, Long> getGlobalKafkaCommitOffsets() {
return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
}
@Override
public String toString() {
return String.format("%s", globalKafkaCommitOffsets.keySet().stream()
.map(key -> key + "=" + globalKafkaCommitOffsets.get(key))
.collect(Collectors.joining(", ", "{", "}")));
}
}
/**
* The info sent by a {@link TransactionParticipant} instances to the
* {@link TransactionCoordinator}.
*/
public static class ParticipantInfo implements Serializable {
private byte[] writeStatusList;
private long kafkaCommitOffset;
private OutcomeType outcomeType;
public ParticipantInfo() {
}
public ParticipantInfo(List<WriteStatus> writeStatuses, long kafkaCommitOffset, OutcomeType outcomeType) throws IOException {
this.writeStatusList = SerializationUtils.serialize(writeStatuses);
this.kafkaCommitOffset = kafkaCommitOffset;
this.outcomeType = outcomeType;
}
public byte[] getWriteStatusList() {
return writeStatusList;
}
public List<WriteStatus> writeStatuses() {
return SerializationUtils.deserialize(writeStatusList);
}
public long getKafkaCommitOffset() {
return kafkaCommitOffset;
}
public OutcomeType getOutcomeType() {
return outcomeType;
}
@Override
public String toString() {
return String.format("%s %s %s", Arrays.toString(writeStatusList), kafkaCommitOffset, outcomeType.name());
}
}
/**
* Type of Control Event.
*/
public enum MsgType {
START_COMMIT,
END_COMMIT,
ACK_COMMIT,
WRITE_STATUS,
}
public enum SenderType {
COORDINATOR,
PARTICIPANT
}
public enum OutcomeType {
WRITE_SUCCESS,
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.ControlMessage;
/**
* The events within the Coordinator that trigger
* the state changes in the state machine of
@@ -28,7 +30,7 @@ public class CoordinatorEvent {
private final CoordinatorEventType eventType;
private final String topicName;
private final String commitTime;
private ControlEvent message;
private ControlMessage message;
public CoordinatorEvent(CoordinatorEventType eventType,
String topicName,
@@ -50,11 +52,11 @@ public class CoordinatorEvent {
return commitTime;
}
public ControlEvent getMessage() {
public ControlMessage getMessage() {
return message;
}
public void setMessage(ControlEvent message) {
public void setMessage(ControlMessage message) {
this.message = message;
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.ControlMessage;
import org.apache.kafka.common.TopicPartition;
/**
@@ -36,5 +38,5 @@ public interface TransactionCoordinator {
TopicPartition getPartition();
/* Called when a control event is received from the Kafka control topic */
void processControlEvent(ControlEvent message);
void processControlEvent(ControlMessage message);
}

View File

@@ -18,11 +18,11 @@
package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.ControlMessage;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import java.io.IOException;
/**
* Interface for the Participant that
* manages Writes for a
@@ -37,11 +37,11 @@ public interface TransactionParticipant {
void buffer(SinkRecord record);
void processRecords() throws IOException;
void processRecords();
TopicPartition getPartition();
void processControlEvent(ControlEvent message);
void processControlEvent(ControlMessage message);
long getLastKafkaCommittedOffset();
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.connect.utils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -26,7 +27,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
@@ -34,6 +37,7 @@ import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -42,10 +46,12 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -76,6 +82,7 @@ public class KafkaConnectUtils {
/**
* Returns the default Hadoop Configuration.
*
* @return
*/
public static Configuration getDefaultHadoopConf() {
@@ -86,6 +93,7 @@ public class KafkaConnectUtils {
/**
* Extract the record fields.
*
* @param keyGenerator key generator Instance of the keygenerator.
* @return Returns the record key columns separated by comma.
*/
@@ -97,7 +105,7 @@ public class KafkaConnectUtils {
* Extract partition columns directly if an instance of class {@link BaseKeyGenerator},
* else extract partition columns from the properties.
*
* @param keyGenerator key generator Instance of the keygenerator.
* @param keyGenerator key generator Instance of the keygenerator.
* @param typedProperties properties from the config.
* @return partition columns Returns the partition columns separated by comma.
*/
@@ -142,7 +150,7 @@ public class KafkaConnectUtils {
return Option.empty();
}
}
public static String hashDigest(String stringToHash) {
MessageDigest md;
try {
@@ -154,4 +162,32 @@ public class KafkaConnectUtils {
byte[] digest = Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8));
return StringUtils.toHexString(digest).toUpperCase();
}
/**
* Build Protobuf message containing the Hudi {@link WriteStatus}.
*
* @param writeStatuses The list of Hudi {@link WriteStatus}.
* @return the protobuf message {@link org.apache.hudi.connect.ControlMessage.ConnectWriteStatus}
* that wraps the Hudi {@link WriteStatus}.
* @throws IOException thrown if the conversion failed.
*/
public static ControlMessage.ConnectWriteStatus buildWriteStatuses(List<WriteStatus> writeStatuses) throws IOException {
return ControlMessage.ConnectWriteStatus.newBuilder()
.setSerializedWriteStatus(
ByteString.copyFrom(
SerializationUtils.serialize(writeStatuses)))
.build();
}
/**
* Unwrap the Hudi {@link WriteStatus} from the received Protobuf message.
*
* @param participantInfo The {@link ControlMessage.ParticipantInfo} that contains the
* underlying {@link WriteStatus} sent by the participants.
* @return the list of {@link WriteStatus} returned by Hudi on a write transaction.
*/
public static List<WriteStatus> getWriteStatuses(ControlMessage.ParticipantInfo participantInfo) {
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
}
}

View File

@@ -91,11 +91,11 @@ public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus
}
@Override
public List<WriteStatus> close() throws IOException {
public List<WriteStatus> close() {
return flushRecords();
}
protected abstract void writeHudiRecord(HoodieRecord<?> record);
protected abstract List<WriteStatus> flushRecords() throws IOException;
protected abstract List<WriteStatus> flushRecords();
}

View File

@@ -92,7 +92,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
}
@Override
public List<WriteStatus> flushRecords() throws IOException {
public List<WriteStatus> flushRecords() {
try {
LOG.info("Number of entries in MemoryBasedMap => "
+ bufferedRecords.getInMemoryMapNumEntries()
@@ -122,7 +122,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
return writeStatuses;
} catch (Exception e) {
throw new IOException("Write records failed", e);
throw new HoodieIOException("Write records failed", new IOException(e));
}
}
}

View File

@@ -27,5 +27,5 @@ public interface ConnectWriter<T> {
void writeRecord(SinkRecord record) throws IOException;
List<T> close() throws IOException;
List<T> close();
}

View File

@@ -39,6 +39,7 @@ import java.util.Properties;
description = "Configurations for Kafka Connect Sink Connector for Hudi.")
public class KafkaConnectConfigs extends HoodieConfig {
public static final int CURRENT_PROTOCOL_VERSION = 0;
public static final String KAFKA_VALUE_CONVERTER = "value.converter";
public static final ConfigProperty<String> KAFKA_BOOTSTRAP_SERVERS = ConfigProperty

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.hudi.connect";
option java_outer_classname = "ConnectControl";
package connect;
message ControlMessage {
uint32 protocolVersion = 1;
EventType type = 2;
string topic_name = 3;
EntityType sender_type = 4;
uint32 sender_partition = 5;
EntityType receiver_type = 6;
uint32 receiver_partition = 7;
string commitTime = 8;
oneof payload {
CoordinatorInfo coordinator_info = 9;
ParticipantInfo participant_info = 10;
}
message CoordinatorInfo {
map<int32, int64> globalKafkaCommitOffsets = 1;
}
message ParticipantInfo {
ConnectWriteStatus writeStatus = 1;
uint64 kafkaOffset = 2;
}
message ConnectWriteStatus {
bytes serializedWriteStatus = 1;
}
enum EventType {
START_COMMIT = 0;
END_COMMIT = 1;
ACK_COMMIT = 2;
WRITE_STATUS = 3;
}
enum EntityType {
COORDINATOR = 0;
PARTICIPANT = 1;
}
}

View File

@@ -0,0 +1,23 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=INFO, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

View File

@@ -22,9 +22,9 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockConnectTransactionServices;
@@ -108,7 +108,7 @@ public class TestConnectTransactionCoordinator {
private final int maxNumberCommitRounds;
private final Map<Integer, Long> kafkaOffsetsCommitted;
private ControlEvent.MsgType expectedMsgType;
private ControlMessage.EventType expectedMsgType;
private int numberCommitRounds;
public MockParticipant(MockKafkaControlAgent kafkaControlAgent,
@@ -121,7 +121,7 @@ public class TestConnectTransactionCoordinator {
this.maxNumberCommitRounds = maxNumberCommitRounds;
this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1));
this.kafkaOffsetsCommitted = new HashMap<>();
expectedMsgType = ControlEvent.MsgType.START_COMMIT;
expectedMsgType = ControlMessage.EventType.START_COMMIT;
numberCommitRounds = 0;
}
@@ -149,9 +149,9 @@ public class TestConnectTransactionCoordinator {
}
@Override
public void processControlEvent(ControlEvent message) {
assertEquals(message.getSenderType(), ControlEvent.SenderType.COORDINATOR);
assertEquals(message.senderPartition().topic(), partition.topic());
public void processControlEvent(ControlMessage message) {
assertEquals(message.getSenderType(), ControlMessage.EntityType.COORDINATOR);
assertEquals(message.getTopicName(), partition.topic());
testScenarios(message);
}
@@ -160,24 +160,24 @@ public class TestConnectTransactionCoordinator {
return 0;
}
private void testScenarios(ControlEvent message) {
assertEquals(expectedMsgType, message.getMsgType());
private void testScenarios(ControlMessage message) {
assertEquals(expectedMsgType, message.getType());
switch (message.getMsgType()) {
switch (message.getType()) {
case START_COMMIT:
expectedMsgType = ControlEvent.MsgType.END_COMMIT;
expectedMsgType = ControlMessage.EventType.END_COMMIT;
break;
case END_COMMIT:
assertEquals(kafkaOffsetsCommitted, message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
int numSuccessPartitions;
Map<Integer, Long> kafkaOffsets = new HashMap<>();
List<ControlEvent> controlEvents = new ArrayList<>();
List<ControlMessage> controlEvents = new ArrayList<>();
// Prepare the WriteStatuses for all partitions
for (int i = 1; i <= NUM_PARTITIONS; i++) {
try {
long kafkaOffset = (long) (Math.random() * 10000);
kafkaOffsets.put(i, kafkaOffset);
ControlEvent event = successWriteStatus(
ControlMessage event = successWriteStatus(
message.getCommitTime(),
new TopicPartition(TOPIC_NAME, i),
kafkaOffset);
@@ -191,11 +191,11 @@ public class TestConnectTransactionCoordinator {
case ALL_CONNECT_TASKS_SUCCESS:
numSuccessPartitions = NUM_PARTITIONS;
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlEvent.MsgType.ACK_COMMIT;
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_CONNECT_TASKS_FAILED:
numSuccessPartitions = NUM_PARTITIONS / 2;
expectedMsgType = ControlEvent.MsgType.START_COMMIT;
expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
@@ -210,18 +210,18 @@ public class TestConnectTransactionCoordinator {
if (numberCommitRounds >= maxNumberCommitRounds) {
latch.countDown();
}
expectedMsgType = ControlEvent.MsgType.START_COMMIT;
expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
default:
throw new HoodieException("Illegal control message type " + message.getMsgType());
throw new HoodieException("Illegal control message type " + message.getType());
}
if (message.getMsgType().equals(ControlEvent.MsgType.START_COMMIT)) {
if (message.getType().equals(ControlMessage.EventType.START_COMMIT)) {
if (numberCommitRounds >= maxNumberCommitRounds) {
latch.countDown();
}
numberCommitRounds++;
expectedMsgType = ControlEvent.MsgType.END_COMMIT;
expectedMsgType = ControlMessage.EventType.END_COMMIT;
}
}
@@ -230,24 +230,29 @@ public class TestConnectTransactionCoordinator {
ALL_CONNECT_TASKS_SUCCESS
}
private static ControlEvent successWriteStatus(String commitTime,
TopicPartition partition,
long kafkaOffset) throws Exception {
private static ControlMessage successWriteStatus(String commitTime,
TopicPartition partition,
long kafkaOffset) throws Exception {
// send WS
WriteStatus writeStatus = new WriteStatus();
WriteStatus status = new WriteStatus(false, 1.0);
for (int i = 0; i < 1000; i++) {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
}
return new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
ControlEvent.SenderType.PARTICIPANT,
commitTime,
partition)
.setParticipantInfo(new ControlEvent.ParticipantInfo(
Collections.singletonList(writeStatus),
kafkaOffset,
ControlEvent.OutcomeType.WRITE_SUCCESS))
.build();
return ControlMessage.newBuilder()
.setType(ControlMessage.EventType.WRITE_STATUS)
.setTopicName(partition.topic())
.setSenderType(ControlMessage.EntityType.PARTICIPANT)
.setSenderPartition(partition.partition())
.setReceiverType(ControlMessage.EntityType.COORDINATOR)
.setReceiverPartition(ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION)
.setCommitTime(commitTime)
.setParticipantInfo(
ControlMessage.ParticipantInfo.newBuilder()
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus)))
.setKafkaOffset(kafkaOffset)
.build()
).build();
}
}
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.connect;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
@@ -76,16 +75,16 @@ public class TestConnectTransactionParticipant {
break;
case COORDINATOR_FAILED_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed
initializeCoordinator();
break;
case COORDINATOR_FAILED_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed
initializeCoordinator();
@@ -95,13 +94,13 @@ public class TestConnectTransactionParticipant {
}
// Regular Case or Coordinator Recovery Case
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -122,13 +121,13 @@ public class TestConnectTransactionParticipant {
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -136,15 +135,15 @@ public class TestConnectTransactionParticipant {
break;
case FAILURE_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -152,17 +151,17 @@ public class TestConnectTransactionParticipant {
break;
case FAILURE_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -203,7 +202,7 @@ public class TestConnectTransactionParticipant {
private final KafkaControlAgent kafkaControlAgent;
private final TopicPartition partition;
private Option<ControlEvent> lastReceivedWriteStatusEvent;
private Option<ControlMessage> lastReceivedWriteStatusEvent;
private long committedKafkaOffset;
public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
@@ -213,26 +212,30 @@ public class TestConnectTransactionParticipant {
committedKafkaOffset = 0L;
}
public void sendEventFromCoordinator(
ControlEvent.MsgType type) {
public void sendEventFromCoordinator(ControlMessage.EventType type) {
try {
if (type.equals(ControlEvent.MsgType.START_COMMIT)) {
if (type.equals(ControlMessage.EventType.START_COMMIT)) {
++currentCommitTime;
}
kafkaControlAgent.publishMessage(new ControlEvent.Builder(
type,
ControlEvent.SenderType.COORDINATOR,
String.valueOf(currentCommitTime),
partition)
.setCoordinatorInfo(new ControlEvent.CoordinatorInfo(
Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset)))
.build());
kafkaControlAgent.publishMessage(
ControlMessage.newBuilder()
.setType(type)
.setTopicName(partition.topic())
.setSenderType(ControlMessage.EntityType.COORDINATOR)
.setSenderPartition(partition.partition())
.setReceiverType(ControlMessage.EntityType.PARTICIPANT)
.setCommitTime(String.valueOf(currentCommitTime))
.setCoordinatorInfo(
ControlMessage.CoordinatorInfo.newBuilder()
.putAllGlobalKafkaCommitOffsets(Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset))
.build()
).build());
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Participant");
}
}
public Option<ControlEvent> getLastReceivedWriteStatusEvent() {
public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
return lastReceivedWriteStatusEvent;
}
@@ -256,11 +259,11 @@ public class TestConnectTransactionParticipant {
}
@Override
public void processControlEvent(ControlEvent message) {
if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
public void processControlEvent(ControlMessage message) {
if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
lastReceivedWriteStatusEvent = Option.of(message);
assertTrue(message.getParticipantInfo().getKafkaCommitOffset() >= committedKafkaOffset);
committedKafkaOffset = message.getParticipantInfo().getKafkaCommitOffset();
assertTrue(message.getParticipantInfo().getKafkaOffset() >= committedKafkaOffset);
committedKafkaOffset = message.getParticipantInfo().getKafkaOffset();
}
}
}

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.helper;
import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.exception.HoodieException;
@@ -70,10 +70,10 @@ public class MockKafkaControlAgent implements KafkaControlAgent {
}
@Override
public void publishMessage(ControlEvent message) {
public void publishMessage(ControlMessage message) {
try {
String topic = message.senderPartition().topic();
if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
String topic = message.getTopicName();
if (message.getSenderType().equals(ControlMessage.EntityType.COORDINATOR)) {
for (TransactionParticipant participant : participants.get(topic)) {
participant.processControlEvent(message);
}

View File

@@ -157,10 +157,19 @@
<scope>compile</scope>
</dependency>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${proto.version}</version>
<scope>compile</scope>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>

View File

@@ -156,6 +156,8 @@
<zk-curator.version>2.7.1</zk-curator.version>
<antlr.version>4.7</antlr.version>
<aws.sdk.version>1.12.22</aws.sdk.version>
<proto.version>3.17.3</proto.version>
<protoc.version>3.1.0</protoc.version>
</properties>
<scm>