diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh
index 81968a4e7..20edb1ceb 100755
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -95,10 +95,10 @@ while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
done
# First delete the existing topic
-#${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
+${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
# Create the topic with 4 partitions
-#${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
+${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
# Setup the schema registry
export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring)
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index 7742f3b31..7a79f265c 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -63,6 +63,25 @@
org.apache.rat
apache-rat-plugin
+
+ com.github.os72
+ protoc-jar-maven-plugin
+ 3.1.0.1
+
+
+ generate-sources
+
+ run
+
+
+ ${protoc.version}
+
+ src/main/resources
+
+
+
+
+
@@ -138,6 +157,13 @@
+
+
+ com.google.protobuf
+ protobuf-java
+ ${proto.version}
+
+
log4j
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
index a937a8b82..c14a86656 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -36,7 +37,6 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -51,7 +51,6 @@ public class HoodieSinkTask extends SinkTask {
public static final String TASK_ID_CONFIG_NAME = "task.id";
private static final Logger LOG = LogManager.getLogger(HoodieSinkTask.class);
- private static final int COORDINATOR_KAFKA_PARTITION = 0;
private final Map transactionCoordinators;
private final Map transactionParticipants;
@@ -113,7 +112,7 @@ public class HoodieSinkTask extends SinkTask {
}
try {
transactionParticipants.get(partition).processRecords();
- } catch (IOException exception) {
+ } catch (HoodieIOException exception) {
throw new RetriableException("Intermittent write errors for Hudi "
+ " for the topic/partition: " + partition.topic() + ":" + partition.partition()
+ " , ensuring kafka connect will retry ", exception);
@@ -164,7 +163,7 @@ public class HoodieSinkTask extends SinkTask {
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
for (TopicPartition partition : partitions) {
- if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
+ if (partition.partition() == ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION) {
if (transactionCoordinators.containsKey(partition)) {
transactionCoordinators.get(partition).stop();
transactionCoordinators.remove(partition);
@@ -188,7 +187,7 @@ public class HoodieSinkTask extends SinkTask {
for (TopicPartition partition : partitions) {
try {
// If the partition is 0, instantiate the Leader
- if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
+ if (partition.partition() == ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION) {
ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(
connectConfigs,
partition,
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
index a115147ae..776beafbd 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
@@ -18,17 +18,16 @@
package org.apache.hudi.connect.kafka;
-import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -66,7 +65,7 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
// List of TransactionParticipants per Kafka Topic
private final Map> partitionWorkers;
private final KafkaControlProducer producer;
- private KafkaConsumer consumer;
+ private KafkaConsumer consumer;
public KafkaConnectControlAgent(String bootstrapServers,
String controlTopicName) {
@@ -118,7 +117,7 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
}
@Override
- public void publishMessage(ControlEvent message) {
+ public void publishMessage(ControlMessage message) {
producer.publishMessage(message);
}
@@ -128,28 +127,28 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
// Todo fetch the worker id or name instead of a uuid.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hudi-control-group" + UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
// Since we are using Kafka Control Topic as a RPC like interface,
// we want consumers to only process messages that are sent after they come online
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- consumer = new KafkaConsumer<>(props, new StringDeserializer(),
- new KafkaJsonDeserializer<>(ControlEvent.class));
+ consumer = new KafkaConsumer<>(props, new StringDeserializer(), new ByteArrayDeserializer());
consumer.subscribe(Collections.singletonList(controlTopicName));
executorService.submit(() -> {
while (true) {
- ConsumerRecords records;
+ ConsumerRecords records;
records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
- for (ConsumerRecord record : records) {
+ for (ConsumerRecord record : records) {
try {
LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s",
"", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
- ControlEvent message = record.value();
- String senderTopic = message.senderPartition().topic();
- if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
+ ControlMessage message = ControlMessage.parseFrom(record.value());
+ String senderTopic = message.getTopicName();
+
+ if (message.getReceiverType().equals(ControlMessage.EntityType.PARTICIPANT)) {
if (partitionWorkers.containsKey(senderTopic)) {
for (TransactionParticipant partitionWorker : partitionWorkers.get(senderTopic)) {
partitionWorker.processControlEvent(message);
@@ -157,11 +156,9 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
} else {
LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", senderTopic));
}
- } else if (message.getSenderType().equals(ControlEvent.SenderType.PARTICIPANT)) {
+ } else if (message.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
if (topicCoordinators.containsKey(senderTopic)) {
topicCoordinators.get(senderTopic).processControlEvent(message);
- } else {
- LOG.warn(String.format("Failed to send message for unregistered coordinator for topic %s", senderTopic));
}
} else {
LOG.warn(String.format("Sender type of Control Message unknown %s", message.getSenderType().name()));
@@ -200,31 +197,4 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
}
}
}
-
- /**
- * Deserializes the incoming Kafka records for the Control Topic.
- *
- * @param represents the object that is sent over the Control Topic.
- */
- public static class KafkaJsonDeserializer implements Deserializer {
-
- private static final Logger LOG = LogManager.getLogger(KafkaJsonDeserializer.class);
- private final Class type;
-
- KafkaJsonDeserializer(Class type) {
- this.type = type;
- }
-
- @Override
- public T deserialize(String s, byte[] bytes) {
- ObjectMapper mapper = new ObjectMapper();
- T obj = null;
- try {
- obj = mapper.readValue(bytes, type);
- } catch (Exception e) {
- LOG.error(e.getMessage());
- }
- return obj;
- }
- }
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java
index ea5177eb5..85b843557 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java
@@ -18,7 +18,7 @@
package org.apache.hudi.connect.kafka;
-import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
@@ -37,5 +37,5 @@ public interface KafkaControlAgent {
void deregisterTransactionCoordinator(TransactionCoordinator coordinator);
- void publishMessage(ControlEvent message);
+ void publishMessage(ControlMessage message);
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
index a23251e35..530e57059 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
@@ -18,16 +18,13 @@
package org.apache.hudi.connect.kafka;
-import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.ControlMessage;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -45,7 +42,7 @@ public class KafkaControlProducer {
private final String bootstrapServers;
private final String controlTopicName;
- private Producer producer;
+ private Producer producer;
public KafkaControlProducer(String bootstrapServers, String controlTopicName) {
this.bootstrapServers = bootstrapServers;
@@ -57,12 +54,12 @@ public class KafkaControlProducer {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producer = new KafkaProducer<>(
props,
new StringSerializer(),
- new KafkaJsonSerializer()
+ new ByteArraySerializer()
);
}
@@ -70,28 +67,9 @@ public class KafkaControlProducer {
producer.close();
}
- public void publishMessage(ControlEvent message) {
- ProducerRecord record
- = new ProducerRecord<>(controlTopicName, message.key(), message);
+ public void publishMessage(ControlMessage message) {
+ ProducerRecord record
+ = new ProducerRecord<>(controlTopicName, message.getType().name(), message.toByteArray());
producer.send(record);
}
-
- public static class KafkaJsonSerializer implements Serializer {
-
- private static final Logger LOG = LogManager.getLogger(KafkaJsonSerializer.class);
-
- @Override
- public byte[] serialize(String topic, ControlEvent data) {
- byte[] retVal = null;
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-
- try {
- retVal = objectMapper.writeValueAsBytes(data);
- } catch (Exception e) {
- LOG.error("Fatal error during serialization of Kafka Control Message ", e);
- }
- return retVal;
- }
- }
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index 73a30c610..7acd875b6 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
@@ -53,6 +54,8 @@ import java.util.stream.Collectors;
*/
public class ConnectTransactionCoordinator implements TransactionCoordinator, Runnable {
+ public static final int COORDINATOR_KAFKA_PARTITION = 0;
+
private static final Logger LOG = LogManager.getLogger(ConnectTransactionCoordinator.class);
private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
@@ -158,17 +161,18 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
}
@Override
- public void processControlEvent(ControlEvent message) {
+ public void processControlEvent(ControlMessage message) {
CoordinatorEvent.CoordinatorEventType type;
- if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
+ if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
} else {
- LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", message.getMsgType().name()));
+ LOG.warn(String.format("The Coordinator should not be receiving messages of type %s",
+ message.getType().name()));
return;
}
CoordinatorEvent event = new CoordinatorEvent(type,
- message.senderPartition().topic(),
+ message.getTopicName(),
message.getCommitTime());
event.setMessage(message);
submitEvent(event);
@@ -242,15 +246,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
partitionsWriteStatusReceived.clear();
try {
currentCommitTime = transactionServices.startCommit();
- ControlEvent message = new ControlEvent.Builder(
- ControlEvent.MsgType.START_COMMIT,
- ControlEvent.SenderType.COORDINATOR,
- currentCommitTime,
- partition)
- .setCoordinatorInfo(
- new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
- .build();
- kafkaControlClient.publishMessage(message);
+ kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.START_COMMIT));
currentState = State.STARTED_COMMIT;
// schedule a timeout for ending the current commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT,
@@ -268,14 +264,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
private void endExistingCommit() {
try {
- ControlEvent message = new ControlEvent.Builder(
- ControlEvent.MsgType.END_COMMIT,
- ControlEvent.SenderType.COORDINATOR,
- currentCommitTime,
- partition)
- .setCoordinatorInfo(new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
- .build();
- kafkaControlClient.publishMessage(message);
+ kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.END_COMMIT));
} catch (Exception exception) {
LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
@@ -289,13 +278,11 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
configs.getCoordinatorWriteTimeoutSecs(), TimeUnit.SECONDS);
}
- private void onReceiveWriteStatus(ControlEvent message) {
- ControlEvent.ParticipantInfo participantInfo = message.getParticipantInfo();
- if (participantInfo.getOutcomeType().equals(ControlEvent.OutcomeType.WRITE_SUCCESS)) {
- int partition = message.senderPartition().partition();
- partitionsWriteStatusReceived.put(partition, participantInfo.writeStatuses());
- currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaCommitOffset());
- }
+ private void onReceiveWriteStatus(ControlMessage message) {
+ ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo();
+ int partition = message.getSenderPartition();
+ partitionsWriteStatusReceived.put(partition, KafkaConnectUtils.getWriteStatuses(participantInfo));
+ currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaOffset());
if (partitionsWriteStatusReceived.size() >= numPartitions
&& currentState.equals(State.ENDED_COMMIT)) {
// Commit the kafka offsets to the commit file
@@ -311,7 +298,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
- partition.topic(),
+ message.getTopicName(),
currentCommitTime));
} catch (Exception exception) {
LOG.error("Fatal error while committing file", exception);
@@ -334,15 +321,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
private void submitAckCommit() {
try {
- ControlEvent message = new ControlEvent.Builder(
- ControlEvent.MsgType.ACK_COMMIT,
- ControlEvent.SenderType.COORDINATOR,
- currentCommitTime,
- partition)
- .setCoordinatorInfo(
- new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
- .build();
- kafkaControlClient.publishMessage(message);
+ kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
} catch (Exception exception) {
LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
@@ -397,4 +376,20 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
public interface KafkaPartitionProvider {
int getLatestNumPartitions(String bootstrapServers, String topicName);
}
+
+ private ControlMessage buildControlMessage(ControlMessage.EventType eventType) {
+ return ControlMessage.newBuilder()
+ .setProtocolVersion(KafkaConnectConfigs.CURRENT_PROTOCOL_VERSION)
+ .setType(eventType)
+ .setTopicName(partition.topic())
+ .setSenderType(ControlMessage.EntityType.COORDINATOR)
+ .setSenderPartition(partition.partition())
+ .setReceiverType(ControlMessage.EntityType.PARTICIPANT)
+ .setCommitTime(currentCommitTime)
+ .setCoordinatorInfo(
+ ControlMessage.CoordinatorInfo.newBuilder()
+ .putAllGlobalKafkaCommitOffsets(globalCommittedKafkaOffsets)
+ .build()
+ ).build();
+ }
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
index c3950717d..19556dca4 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
@@ -19,11 +19,14 @@
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -46,7 +49,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
private final LinkedList buffer;
- private final BlockingQueue controlEvents;
+ private final BlockingQueue controlEvents;
private final TopicPartition partition;
private final SinkTaskContext context;
private final KafkaControlAgent kafkaControlAgent;
@@ -95,7 +98,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
@Override
- public void processControlEvent(ControlEvent message) {
+ public void processControlEvent(ControlMessage message) {
controlEvents.add(message);
}
@@ -110,10 +113,10 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
@Override
- public void processRecords() throws IOException {
+ public void processRecords() {
while (!controlEvents.isEmpty()) {
- ControlEvent message = controlEvents.poll();
- switch (message.getMsgType()) {
+ ControlMessage message = controlEvents.poll();
+ switch (message.getType()) {
case START_COMMIT:
handleStartCommit(message);
break;
@@ -127,14 +130,14 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
// ignore write status since its only processed by leader
break;
default:
- throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getMsgType());
+ throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getType().name());
}
}
writeRecords();
}
- private void handleStartCommit(ControlEvent message) {
+ private void handleStartCommit(ControlMessage message) {
// If there is an existing/ongoing transaction locally
// but it failed globally since we received another START_COMMIT instead of an END_COMMIT or ACK_COMMIT,
// so close it and start new transaction
@@ -152,7 +155,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
}
}
- private void handleEndCommit(ControlEvent message) throws IOException {
+ private void handleEndCommit(ControlMessage message) {
if (ongoingTransactionInfo == null) {
LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
return;
@@ -172,21 +175,32 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
try {
//sendWriterStatus
List writeStatuses = ongoingTransactionInfo.getWriter().close();
- ControlEvent writeStatusEvent = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
- ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
- .setParticipantInfo(new ControlEvent.ParticipantInfo(
- writeStatuses,
- ongoingTransactionInfo.getLastWrittenKafkaOffset(),
- ControlEvent.OutcomeType.WRITE_SUCCESS))
- .build();
+
+ ControlMessage writeStatusEvent = ControlMessage.newBuilder()
+ .setProtocolVersion(KafkaConnectConfigs.CURRENT_PROTOCOL_VERSION)
+ .setType(ControlMessage.EventType.WRITE_STATUS)
+ .setTopicName(partition.topic())
+ .setSenderType(ControlMessage.EntityType.PARTICIPANT)
+ .setSenderPartition(partition.partition())
+ .setReceiverType(ControlMessage.EntityType.COORDINATOR)
+ .setReceiverPartition(ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION)
+ .setCommitTime(ongoingTransactionInfo.getCommitTime())
+ .setParticipantInfo(
+ ControlMessage.ParticipantInfo.newBuilder()
+ .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses))
+ .setKafkaOffset(ongoingTransactionInfo.getLastWrittenKafkaOffset())
+ .build()
+ ).build();
+
kafkaControlAgent.publishMessage(writeStatusEvent);
} catch (Exception exception) {
LOG.error(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
- throw new IOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
+ throw new HoodieIOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()),
+ new IOException(exception));
}
}
- private void handleAckCommit(ControlEvent message) {
+ private void handleAckCommit(ControlMessage message) {
// Update lastKafkCommitedOffset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset();
@@ -224,15 +238,15 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
try {
ongoingTransactionInfo.getWriter().close();
ongoingTransactionInfo = null;
- } catch (IOException exception) {
+ } catch (HoodieIOException exception) {
LOG.warn("Error received while trying to cleanup existing transaction", exception);
}
}
}
- private void syncKafkaOffsetWithLeader(ControlEvent message) {
- if (message.getCoordinatorInfo() != null) {
- Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsets().get(partition.partition());
+ private void syncKafkaOffsetWithLeader(ControlMessage message) {
+ if (message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().containsKey(partition.partition())) {
+ Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsetsMap().get(partition.partition());
// Recover kafka committed offsets, treating the commit offset from the coordinator
// as the source of truth
if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
deleted file mode 100644
index 5a35e7a16..000000000
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.connect.transaction;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.util.SerializationUtils;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * The events sent over the Kafka Control Topic between the
- * coordinator and the followers, in order to ensure
- * coordination across all the writes.
- */
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public class ControlEvent implements Serializable {
-
- private static final Logger LOG = LogManager.getLogger(ControlEvent.class);
- private static final int CURRENT_VERSION = 0;
-
- private final int version = CURRENT_VERSION;
- private MsgType msgType;
- private SenderType senderType;
- private String commitTime;
- private byte[] senderPartition;
- private CoordinatorInfo coordinatorInfo;
- private ParticipantInfo participantInfo;
-
- public ControlEvent() {
- }
-
- public ControlEvent(MsgType msgType,
- SenderType senderType,
- String commitTime,
- byte[] senderPartition,
- CoordinatorInfo coordinatorInfo,
- ParticipantInfo participantInfo) {
- this.msgType = msgType;
- this.senderType = senderType;
- this.commitTime = commitTime;
- this.senderPartition = senderPartition;
- this.coordinatorInfo = coordinatorInfo;
- this.participantInfo = participantInfo;
- }
-
- public String key() {
- return msgType.name().toLowerCase(Locale.ROOT);
- }
-
- public MsgType getMsgType() {
- return msgType;
- }
-
- public SenderType getSenderType() {
- return senderType;
- }
-
- public String getCommitTime() {
- return commitTime;
- }
-
- public byte[] getSenderPartition() {
- return senderPartition;
- }
-
- public TopicPartition senderPartition() {
- return SerializationUtils.deserialize(senderPartition);
- }
-
- public CoordinatorInfo getCoordinatorInfo() {
- return coordinatorInfo;
- }
-
- public ParticipantInfo getParticipantInfo() {
- return participantInfo;
- }
-
- public int getVersion() {
- return version;
- }
-
- @Override
- public String toString() {
- return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
- Arrays.toString(senderPartition),
- (coordinatorInfo == null) ? "" : coordinatorInfo.toString(),
- (participantInfo == null) ? "" : participantInfo.toString());
- }
-
- /**
- * Builder that helps build {@link ControlEvent}.
- */
- public static class Builder {
-
- private final MsgType msgType;
- private SenderType senderType;
- private final String commitTime;
- private final byte[] senderPartition;
- private CoordinatorInfo coordinatorInfo;
- private ParticipantInfo participantInfo;
-
- public Builder(MsgType msgType, SenderType senderType, String commitTime, TopicPartition senderPartition) throws IOException {
- this.msgType = msgType;
- this.senderType = senderType;
- this.commitTime = commitTime;
- this.senderPartition = SerializationUtils.serialize(senderPartition);
- }
-
- public Builder setCoordinatorInfo(CoordinatorInfo coordinatorInfo) {
- this.coordinatorInfo = coordinatorInfo;
- return this;
- }
-
- public Builder setParticipantInfo(ParticipantInfo participantInfo) {
- this.participantInfo = participantInfo;
- return this;
- }
-
- public ControlEvent build() {
- return new ControlEvent(msgType, senderType, commitTime, senderPartition, coordinatorInfo, participantInfo);
- }
- }
-
- /**
- * The info sent by the {@link TransactionCoordinator} to one or more
- * {@link TransactionParticipant}s.
- */
- public static class CoordinatorInfo implements Serializable {
-
- private Map globalKafkaCommitOffsets;
-
- public CoordinatorInfo() {
- }
-
- public CoordinatorInfo(Map globalKafkaCommitOffsets) {
- this.globalKafkaCommitOffsets = globalKafkaCommitOffsets;
- }
-
- public Map getGlobalKafkaCommitOffsets() {
- return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
- }
-
- @Override
- public String toString() {
- return String.format("%s", globalKafkaCommitOffsets.keySet().stream()
- .map(key -> key + "=" + globalKafkaCommitOffsets.get(key))
- .collect(Collectors.joining(", ", "{", "}")));
- }
- }
-
- /**
- * The info sent by a {@link TransactionParticipant} instances to the
- * {@link TransactionCoordinator}.
- */
- public static class ParticipantInfo implements Serializable {
-
- private byte[] writeStatusList;
- private long kafkaCommitOffset;
- private OutcomeType outcomeType;
-
- public ParticipantInfo() {
- }
-
- public ParticipantInfo(List writeStatuses, long kafkaCommitOffset, OutcomeType outcomeType) throws IOException {
- this.writeStatusList = SerializationUtils.serialize(writeStatuses);
- this.kafkaCommitOffset = kafkaCommitOffset;
- this.outcomeType = outcomeType;
- }
-
- public byte[] getWriteStatusList() {
- return writeStatusList;
- }
-
- public List writeStatuses() {
- return SerializationUtils.deserialize(writeStatusList);
- }
-
- public long getKafkaCommitOffset() {
- return kafkaCommitOffset;
- }
-
- public OutcomeType getOutcomeType() {
- return outcomeType;
- }
-
- @Override
- public String toString() {
- return String.format("%s %s %s", Arrays.toString(writeStatusList), kafkaCommitOffset, outcomeType.name());
- }
- }
-
- /**
- * Type of Control Event.
- */
- public enum MsgType {
- START_COMMIT,
- END_COMMIT,
- ACK_COMMIT,
- WRITE_STATUS,
- }
-
- public enum SenderType {
- COORDINATOR,
- PARTICIPANT
- }
-
- public enum OutcomeType {
- WRITE_SUCCESS,
- }
-}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
index a0e2654cd..f9f467a83 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
@@ -18,6 +18,8 @@
package org.apache.hudi.connect.transaction;
+import org.apache.hudi.connect.ControlMessage;
+
/**
* The events within the Coordinator that trigger
* the state changes in the state machine of
@@ -28,7 +30,7 @@ public class CoordinatorEvent {
private final CoordinatorEventType eventType;
private final String topicName;
private final String commitTime;
- private ControlEvent message;
+ private ControlMessage message;
public CoordinatorEvent(CoordinatorEventType eventType,
String topicName,
@@ -50,11 +52,11 @@ public class CoordinatorEvent {
return commitTime;
}
- public ControlEvent getMessage() {
+ public ControlMessage getMessage() {
return message;
}
- public void setMessage(ControlEvent message) {
+ public void setMessage(ControlMessage message) {
this.message = message;
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java
index 04f8a2e3c..d6759d84c 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java
@@ -18,6 +18,8 @@
package org.apache.hudi.connect.transaction;
+import org.apache.hudi.connect.ControlMessage;
+
import org.apache.kafka.common.TopicPartition;
/**
@@ -36,5 +38,5 @@ public interface TransactionCoordinator {
TopicPartition getPartition();
/* Called when a control event is received from the Kafka control topic */
- void processControlEvent(ControlEvent message);
+ void processControlEvent(ControlMessage message);
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
index c19d1b849..d27b14ef4 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
@@ -18,11 +18,11 @@
package org.apache.hudi.connect.transaction;
+import org.apache.hudi.connect.ControlMessage;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
-import java.io.IOException;
-
/**
* Interface for the Participant that
* manages Writes for a
@@ -37,11 +37,11 @@ public interface TransactionParticipant {
void buffer(SinkRecord record);
- void processRecords() throws IOException;
+ void processRecords();
TopicPartition getPartition();
- void processControlEvent(ControlEvent message);
+ void processControlEvent(ControlMessage message);
long getLastKafkaCommittedOffset();
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index 70cfa953b..3c77063dd 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.connect.utils;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -26,7 +27,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
@@ -34,6 +37,7 @@ import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -42,10 +46,12 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -76,6 +82,7 @@ public class KafkaConnectUtils {
/**
* Returns the default Hadoop Configuration.
+ *
* @return
*/
public static Configuration getDefaultHadoopConf() {
@@ -86,6 +93,7 @@ public class KafkaConnectUtils {
/**
* Extract the record fields.
+ *
* @param keyGenerator key generator Instance of the keygenerator.
* @return Returns the record key columns separated by comma.
*/
@@ -97,7 +105,7 @@ public class KafkaConnectUtils {
* Extract partition columns directly if an instance of class {@link BaseKeyGenerator},
* else extract partition columns from the properties.
*
- * @param keyGenerator key generator Instance of the keygenerator.
+ * @param keyGenerator key generator Instance of the keygenerator.
* @param typedProperties properties from the config.
* @return partition columns Returns the partition columns separated by comma.
*/
@@ -142,7 +150,7 @@ public class KafkaConnectUtils {
return Option.empty();
}
}
-
+
public static String hashDigest(String stringToHash) {
MessageDigest md;
try {
@@ -154,4 +162,32 @@ public class KafkaConnectUtils {
byte[] digest = Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8));
return StringUtils.toHexString(digest).toUpperCase();
}
+
+ /**
+ * Build Protobuf message containing the Hudi {@link WriteStatus}.
+ *
+ * @param writeStatuses The list of Hudi {@link WriteStatus}.
+ * @return the protobuf message {@link org.apache.hudi.connect.ControlMessage.ConnectWriteStatus}
+ * that wraps the Hudi {@link WriteStatus}.
+ * @throws IOException thrown if the conversion failed.
+ */
+ public static ControlMessage.ConnectWriteStatus buildWriteStatuses(List writeStatuses) throws IOException {
+ return ControlMessage.ConnectWriteStatus.newBuilder()
+ .setSerializedWriteStatus(
+ ByteString.copyFrom(
+ SerializationUtils.serialize(writeStatuses)))
+ .build();
+ }
+
+ /**
+ * Unwrap the Hudi {@link WriteStatus} from the received Protobuf message.
+ *
+ * @param participantInfo The {@link ControlMessage.ParticipantInfo} that contains the
+ * underlying {@link WriteStatus} sent by the participants.
+ * @return the list of {@link WriteStatus} returned by Hudi on a write transaction.
+ */
+ public static List getWriteStatuses(ControlMessage.ParticipantInfo participantInfo) {
+ ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
+ return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
+ }
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 9888fd1d5..a579484f6 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -91,11 +91,11 @@ public abstract class AbstractConnectWriter implements ConnectWriter close() throws IOException {
+ public List close() {
return flushRecords();
}
protected abstract void writeHudiRecord(HoodieRecord> record);
- protected abstract List flushRecords() throws IOException;
+ protected abstract List flushRecords();
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index 0449f071d..0e92e674d 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -92,7 +92,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
}
@Override
- public List flushRecords() throws IOException {
+ public List flushRecords() {
try {
LOG.info("Number of entries in MemoryBasedMap => "
+ bufferedRecords.getInMemoryMapNumEntries()
@@ -122,7 +122,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
return writeStatuses;
} catch (Exception e) {
- throw new IOException("Write records failed", e);
+ throw new HoodieIOException("Write records failed", new IOException(e));
}
}
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java
index a90d72a45..7249d4758 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java
@@ -27,5 +27,5 @@ public interface ConnectWriter {
void writeRecord(SinkRecord record) throws IOException;
- List close() throws IOException;
+ List close();
}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index c264c3cdb..773ce1e04 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -39,6 +39,7 @@ import java.util.Properties;
description = "Configurations for Kafka Connect Sink Connector for Hudi.")
public class KafkaConnectConfigs extends HoodieConfig {
+ public static final int CURRENT_PROTOCOL_VERSION = 0;
public static final String KAFKA_VALUE_CONVERTER = "value.converter";
public static final ConfigProperty KAFKA_BOOTSTRAP_SERVERS = ConfigProperty
diff --git a/hudi-kafka-connect/src/main/resources/ControlMessage.proto b/hudi-kafka-connect/src/main/resources/ControlMessage.proto
new file mode 100644
index 000000000..5059897c3
--- /dev/null
+++ b/hudi-kafka-connect/src/main/resources/ControlMessage.proto
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.hudi.connect";
+option java_outer_classname = "ConnectControl";
+
+package connect;
+
+message ControlMessage {
+ uint32 protocolVersion = 1;
+ EventType type = 2;
+ string topic_name = 3;
+ EntityType sender_type = 4;
+ uint32 sender_partition = 5;
+ EntityType receiver_type = 6;
+ uint32 receiver_partition = 7;
+ string commitTime = 8;
+ oneof payload {
+ CoordinatorInfo coordinator_info = 9;
+ ParticipantInfo participant_info = 10;
+ }
+
+ message CoordinatorInfo {
+ map globalKafkaCommitOffsets = 1;
+ }
+
+ message ParticipantInfo {
+ ConnectWriteStatus writeStatus = 1;
+ uint64 kafkaOffset = 2;
+ }
+
+ message ConnectWriteStatus {
+ bytes serializedWriteStatus = 1;
+ }
+
+ enum EventType {
+ START_COMMIT = 0;
+ END_COMMIT = 1;
+ ACK_COMMIT = 2;
+ WRITE_STATUS = 3;
+ }
+
+ enum EntityType {
+ COORDINATOR = 0;
+ PARTICIPANT = 1;
+ }
+}
diff --git a/hudi-kafka-connect/src/main/resources/log4j.properties b/hudi-kafka-connect/src/main/resources/log4j.properties
new file mode 100644
index 000000000..ff268faf6
--- /dev/null
+++ b/hudi-kafka-connect/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=INFO, A1
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
index 21940ab43..6e049c611 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -22,9 +22,9 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
-import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockConnectTransactionServices;
@@ -108,7 +108,7 @@ public class TestConnectTransactionCoordinator {
private final int maxNumberCommitRounds;
private final Map kafkaOffsetsCommitted;
- private ControlEvent.MsgType expectedMsgType;
+ private ControlMessage.EventType expectedMsgType;
private int numberCommitRounds;
public MockParticipant(MockKafkaControlAgent kafkaControlAgent,
@@ -121,7 +121,7 @@ public class TestConnectTransactionCoordinator {
this.maxNumberCommitRounds = maxNumberCommitRounds;
this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1));
this.kafkaOffsetsCommitted = new HashMap<>();
- expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+ expectedMsgType = ControlMessage.EventType.START_COMMIT;
numberCommitRounds = 0;
}
@@ -149,9 +149,9 @@ public class TestConnectTransactionCoordinator {
}
@Override
- public void processControlEvent(ControlEvent message) {
- assertEquals(message.getSenderType(), ControlEvent.SenderType.COORDINATOR);
- assertEquals(message.senderPartition().topic(), partition.topic());
+ public void processControlEvent(ControlMessage message) {
+ assertEquals(message.getSenderType(), ControlMessage.EntityType.COORDINATOR);
+ assertEquals(message.getTopicName(), partition.topic());
testScenarios(message);
}
@@ -160,24 +160,24 @@ public class TestConnectTransactionCoordinator {
return 0;
}
- private void testScenarios(ControlEvent message) {
- assertEquals(expectedMsgType, message.getMsgType());
+ private void testScenarios(ControlMessage message) {
+ assertEquals(expectedMsgType, message.getType());
- switch (message.getMsgType()) {
+ switch (message.getType()) {
case START_COMMIT:
- expectedMsgType = ControlEvent.MsgType.END_COMMIT;
+ expectedMsgType = ControlMessage.EventType.END_COMMIT;
break;
case END_COMMIT:
assertEquals(kafkaOffsetsCommitted, message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
int numSuccessPartitions;
Map kafkaOffsets = new HashMap<>();
- List controlEvents = new ArrayList<>();
+ List controlEvents = new ArrayList<>();
// Prepare the WriteStatuses for all partitions
for (int i = 1; i <= NUM_PARTITIONS; i++) {
try {
long kafkaOffset = (long) (Math.random() * 10000);
kafkaOffsets.put(i, kafkaOffset);
- ControlEvent event = successWriteStatus(
+ ControlMessage event = successWriteStatus(
message.getCommitTime(),
new TopicPartition(TOPIC_NAME, i),
kafkaOffset);
@@ -191,11 +191,11 @@ public class TestConnectTransactionCoordinator {
case ALL_CONNECT_TASKS_SUCCESS:
numSuccessPartitions = NUM_PARTITIONS;
kafkaOffsetsCommitted.putAll(kafkaOffsets);
- expectedMsgType = ControlEvent.MsgType.ACK_COMMIT;
+ expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_CONNECT_TASKS_FAILED:
numSuccessPartitions = NUM_PARTITIONS / 2;
- expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+ expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
@@ -210,18 +210,18 @@ public class TestConnectTransactionCoordinator {
if (numberCommitRounds >= maxNumberCommitRounds) {
latch.countDown();
}
- expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+ expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
default:
- throw new HoodieException("Illegal control message type " + message.getMsgType());
+ throw new HoodieException("Illegal control message type " + message.getType());
}
- if (message.getMsgType().equals(ControlEvent.MsgType.START_COMMIT)) {
+ if (message.getType().equals(ControlMessage.EventType.START_COMMIT)) {
if (numberCommitRounds >= maxNumberCommitRounds) {
latch.countDown();
}
numberCommitRounds++;
- expectedMsgType = ControlEvent.MsgType.END_COMMIT;
+ expectedMsgType = ControlMessage.EventType.END_COMMIT;
}
}
@@ -230,24 +230,29 @@ public class TestConnectTransactionCoordinator {
ALL_CONNECT_TASKS_SUCCESS
}
- private static ControlEvent successWriteStatus(String commitTime,
- TopicPartition partition,
- long kafkaOffset) throws Exception {
+ private static ControlMessage successWriteStatus(String commitTime,
+ TopicPartition partition,
+ long kafkaOffset) throws Exception {
// send WS
WriteStatus writeStatus = new WriteStatus();
WriteStatus status = new WriteStatus(false, 1.0);
for (int i = 0; i < 1000; i++) {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
}
- return new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
- ControlEvent.SenderType.PARTICIPANT,
- commitTime,
- partition)
- .setParticipantInfo(new ControlEvent.ParticipantInfo(
- Collections.singletonList(writeStatus),
- kafkaOffset,
- ControlEvent.OutcomeType.WRITE_SUCCESS))
- .build();
+ return ControlMessage.newBuilder()
+ .setType(ControlMessage.EventType.WRITE_STATUS)
+ .setTopicName(partition.topic())
+ .setSenderType(ControlMessage.EntityType.PARTICIPANT)
+ .setSenderPartition(partition.partition())
+ .setReceiverType(ControlMessage.EntityType.COORDINATOR)
+ .setReceiverPartition(ConnectTransactionCoordinator.COORDINATOR_KAFKA_PARTITION)
+ .setCommitTime(commitTime)
+ .setParticipantInfo(
+ ControlMessage.ParticipantInfo.newBuilder()
+ .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus)))
+ .setKafkaOffset(kafkaOffset)
+ .build()
+ ).build();
}
}
}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
index 4e5aaa19b..5d551a79f 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
@@ -21,7 +21,6 @@ package org.apache.hudi.connect;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
-import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
@@ -76,16 +75,16 @@ public class TestConnectTransactionParticipant {
break;
case COORDINATOR_FAILED_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed
initializeCoordinator();
break;
case COORDINATOR_FAILED_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed
initializeCoordinator();
@@ -95,13 +94,13 @@ public class TestConnectTransactionParticipant {
}
// Regular Case or Coordinator Recovery Case
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -122,13 +121,13 @@ public class TestConnectTransactionParticipant {
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -136,15 +135,15 @@ public class TestConnectTransactionParticipant {
break;
case FAILURE_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -152,17 +151,17 @@ public class TestConnectTransactionParticipant {
break;
case FAILURE_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant();
testKafkaConnect.putRecordsToParticipant();
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
- coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+ coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
@@ -203,7 +202,7 @@ public class TestConnectTransactionParticipant {
private final KafkaControlAgent kafkaControlAgent;
private final TopicPartition partition;
- private Option lastReceivedWriteStatusEvent;
+ private Option lastReceivedWriteStatusEvent;
private long committedKafkaOffset;
public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
@@ -213,26 +212,30 @@ public class TestConnectTransactionParticipant {
committedKafkaOffset = 0L;
}
- public void sendEventFromCoordinator(
- ControlEvent.MsgType type) {
+ public void sendEventFromCoordinator(ControlMessage.EventType type) {
try {
- if (type.equals(ControlEvent.MsgType.START_COMMIT)) {
+ if (type.equals(ControlMessage.EventType.START_COMMIT)) {
++currentCommitTime;
}
- kafkaControlAgent.publishMessage(new ControlEvent.Builder(
- type,
- ControlEvent.SenderType.COORDINATOR,
- String.valueOf(currentCommitTime),
- partition)
- .setCoordinatorInfo(new ControlEvent.CoordinatorInfo(
- Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset)))
- .build());
+ kafkaControlAgent.publishMessage(
+ ControlMessage.newBuilder()
+ .setType(type)
+ .setTopicName(partition.topic())
+ .setSenderType(ControlMessage.EntityType.COORDINATOR)
+ .setSenderPartition(partition.partition())
+ .setReceiverType(ControlMessage.EntityType.PARTICIPANT)
+ .setCommitTime(String.valueOf(currentCommitTime))
+ .setCoordinatorInfo(
+ ControlMessage.CoordinatorInfo.newBuilder()
+ .putAllGlobalKafkaCommitOffsets(Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset))
+ .build()
+ ).build());
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Participant");
}
}
- public Option getLastReceivedWriteStatusEvent() {
+ public Option getLastReceivedWriteStatusEvent() {
return lastReceivedWriteStatusEvent;
}
@@ -256,11 +259,11 @@ public class TestConnectTransactionParticipant {
}
@Override
- public void processControlEvent(ControlEvent message) {
- if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
+ public void processControlEvent(ControlMessage message) {
+ if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
lastReceivedWriteStatusEvent = Option.of(message);
- assertTrue(message.getParticipantInfo().getKafkaCommitOffset() >= committedKafkaOffset);
- committedKafkaOffset = message.getParticipantInfo().getKafkaCommitOffset();
+ assertTrue(message.getParticipantInfo().getKafkaOffset() >= committedKafkaOffset);
+ committedKafkaOffset = message.getParticipantInfo().getKafkaOffset();
}
}
}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java
index 529cd75fd..eed79c486 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java
@@ -18,8 +18,8 @@
package org.apache.hudi.helper;
+import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
-import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.exception.HoodieException;
@@ -70,10 +70,10 @@ public class MockKafkaControlAgent implements KafkaControlAgent {
}
@Override
- public void publishMessage(ControlEvent message) {
+ public void publishMessage(ControlMessage message) {
try {
- String topic = message.senderPartition().topic();
- if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
+ String topic = message.getTopicName();
+ if (message.getSenderType().equals(ControlMessage.EntityType.COORDINATOR)) {
for (TransactionParticipant participant : participants.get(topic)) {
participant.processControlEvent(message);
}
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
index cf81096ee..debbfa785 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -157,10 +157,19 @@
compile
+
+
+ com.google.protobuf
+ protobuf-java
+ ${proto.version}
+ compile
+
+
org.apache.hadoop
hadoop-common
+ ${hadoop.version}
compile
diff --git a/pom.xml b/pom.xml
index a1beac06a..dcfe6cfa3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,8 @@
2.7.1
4.7
1.12.22
+ 3.17.3
+ 3.1.0