1
0

[HUDI-2394] Implement Kafka Sink Protocol for Hudi for Ingesting Immutable Data (#3592)

- Fixing packaging, naming of classes
 - Use of log4j over slf4j for uniformity
- More follow-on fixes
 - Added a version to control/coordinator events.
 - Eliminated the config added to write config
 - Fixed fetching of checkpoints based on table type
 - Clean up of naming, code placement

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
rmahindra123
2021-09-10 18:20:26 -07:00
committed by GitHub
parent bd1d2d4952
commit e528dd798a
51 changed files with 4710 additions and 22 deletions

View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HudiSinkConnector is a Kafka Connect Connector implementation
* that ingest data from Kafka to Hudi.
*/
public class HoodieSinkConnector extends SinkConnector {
public static final String VERSION = "0.1.0";
private static final Logger LOG = LogManager.getLogger(HoodieSinkConnector.class);
private Map<String, String> configProps;
/**
* No-arg constructor. It is instantiated by Connect framework.
*/
public HoodieSinkConnector() {
}
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
configProps = new HashMap<>(props);
}
@Override
public Class<? extends Task> taskClass() {
return HoodieSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
Map<String, String> taskProps = new HashMap<>(configProps);
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
taskConfigs.add(taskProps);
}
return taskConfigs;
}
@Override
public void stop() {
LOG.info(String.format("Shutting down Hudi Sink connector %s", configProps.get("name")));
}
@Override
public ConfigDef config() {
// we use Hudi configs instead
return new ConfigDef();
}
}

View File

@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect;
import org.apache.hudi.connect.kafka.KafkaConnectControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Implementation of the {@link SinkTask} interface provided by
* Kafka Connect. Implements methods to receive the Kafka records
* from the assigned partitions and commit the Kafka offsets.
* Also, handles re-assignments of partitions.
*/
public class HoodieSinkTask extends SinkTask {
public static final String TASK_ID_CONFIG_NAME = "task.id";
private static final Logger LOG = LogManager.getLogger(HoodieSinkTask.class);
private static final int COORDINATOR_KAFKA_PARTITION = 0;
private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
private final Map<TopicPartition, TransactionParticipant> hudiTransactionParticipants;
private KafkaConnectControlAgent controlKafkaClient;
private KafkaConnectConfigs connectConfigs;
private String taskId;
private String connectorName;
public HoodieSinkTask() {
transactionCoordinators = new HashMap();
hudiTransactionParticipants = new HashMap<>();
}
@Override
public String version() {
return HoodieSinkConnector.VERSION;
}
@Override
public void start(Map<String, String> props) {
connectorName = props.get("name");
taskId = props.get(TASK_ID_CONFIG_NAME);
LOG.info(String.format("Starting Hudi Sink Task for %s connector %s with id %s with assignments %s",
props, connectorName, taskId, context.assignment()));
try {
connectConfigs = KafkaConnectConfigs.newBuilder().withProperties(props).build();
controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
connectConfigs.getBootstrapServers(),
connectConfigs.getControlTopicName());
bootstrap(context.assignment());
} catch (ConfigException e) {
throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
} catch (ConnectException e) {
LOG.error("Couldn't start HudiSinkConnector:", e);
LOG.info("Shutting down HudiSinkConnector.");
cleanup();
// Always throw the original exception that prevent us from starting
throw e;
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
String topic = record.topic();
int partition = record.kafkaPartition();
TopicPartition tp = new TopicPartition(topic, partition);
hudiTransactionParticipants.get(tp).buffer(record);
}
for (TopicPartition partition : context.assignment()) {
hudiTransactionParticipants.get(partition).processRecords();
}
}
@Override
public void stop() {
cleanup();
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// No-op. The connector is managing the offsets.
}
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// Although the connector manages offsets via commit files in Hudi, we still want to have Connect
// commit the consumer offsets for records this task has consumed from its topic partitions and
// committed to Hudi.
Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
for (TopicPartition partition : context.assignment()) {
TransactionParticipant worker = hudiTransactionParticipants.get(partition);
if (worker != null) {
worker.processRecords();
if (worker.getLastKafkaCommittedOffset() >= 0) {
result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
}
}
}
return result;
}
@Override
public void open(Collection<TopicPartition> partitions) {
LOG.info("New partitions added " + partitions.toString());
bootstrap(partitions);
}
@Override
public void close(Collection<TopicPartition> partitions) {
LOG.info("Existing partitions deleted " + partitions.toString());
// Close any writers we have. We may get assigned the same partitions and end up duplicating
// some effort since we'll have to reprocess those messages. It may be possible to hold on to
// the TopicPartitionWriter and continue to use the temp file, but this can get significantly
// more complex due to potential failures and network partitions. For example, we may get
// this close, then miss a few generations of group membership, during which
// data may have continued to be processed and we'd have to restart from the recovery stage,
// make sure we apply the WAL, and only reuse the temp file if the starting offset is still
// valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
for (TopicPartition partition : partitions) {
if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
if (transactionCoordinators.containsKey(partition)) {
transactionCoordinators.get(partition).stop();
transactionCoordinators.remove(partition);
}
}
TransactionParticipant worker = hudiTransactionParticipants.remove(partition);
if (worker != null) {
try {
LOG.debug("Closing data writer due to task start failure.");
worker.stop();
} catch (Throwable t) {
LOG.debug(String.format("Error closing and stopping data writer: %s", t.getMessage()), t);
}
}
}
}
private void bootstrap(Collection<TopicPartition> partitions) {
LOG.info(String.format("Bootstrap task for connector %s with id %s with assignments %s part %s",
connectorName, taskId, context.assignment(), partitions));
for (TopicPartition partition : partitions) {
try {
// If the partition is 0, instantiate the Leader
if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(
connectConfigs,
partition,
controlKafkaClient);
coordinator.start();
transactionCoordinators.put(partition, coordinator);
}
ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context);
hudiTransactionParticipants.put(partition, worker);
worker.start();
} catch (HoodieException exception) {
LOG.error(String.format("Fatal error initializing task %s for partition %s", taskId, partition.partition()), exception);
}
}
}
private void cleanup() {
for (TopicPartition partition : context.assignment()) {
TransactionParticipant worker = hudiTransactionParticipants.get(partition);
if (worker != null) {
try {
LOG.debug("Closing data writer due to task start failure.");
worker.stop();
} catch (Throwable t) {
LOG.debug("Error closing and stopping data writer", t);
}
}
}
hudiTransactionParticipants.clear();
transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
transactionCoordinators.clear();
}
}

View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.FileIdPrefixProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Objects;
import java.util.Properties;
public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition";
private static final Logger LOG = LogManager.getLogger(KafkaConnectFileIdPrefixProvider.class);
private final String kafkaPartition;
public KafkaConnectFileIdPrefixProvider(Properties props) {
super(props);
if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
throw new HoodieException("Kafka Connect Partition Key " + KAFKA_CONNECT_PARTITION_ID + " not provided");
}
this.kafkaPartition = props.getProperty(KAFKA_CONNECT_PARTITION_ID);
}
@Override
public String createFilePrefix(String partitionPath) {
// We use a combination of kafka partition and partition path as the file id, and then hash it
// to generate a fixed sized hash.
String rawFileIdPrefix = kafkaPartition + partitionPath;
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
LOG.error("Fatal error selecting hash algorithm", e);
throw new HoodieException(e);
}
byte[] digest = Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8));
LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + partitionPath + " = " + rawFileIdPrefix
+ " === " + StringUtils.toHexString(digest).toUpperCase());
return StringUtils.toHexString(digest).toUpperCase();
}
}

View File

@@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Class that manages the Kafka consumer and producer for
* the Kafka Control Topic that ensures coordination across the
* {@link TransactionCoordinator} and {@link TransactionParticipant}s.
* Use a single instance per worker (single-threaded),
* and register multiple tasks that can receive the control messages.
*/
public class KafkaConnectControlAgent implements KafkaControlAgent {
private static final Logger LOG = LogManager.getLogger(KafkaConnectControlAgent.class);
private static final Object LOCK = new Object();
private static final long KAFKA_POLL_TIMEOUT_MS = 100;
private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
private static KafkaConnectControlAgent agent;
private final String bootstrapServers;
private final String controlTopicName;
private final ExecutorService executorService;
private final Map<String, TransactionCoordinator> topicCoordinators;
// List of TransactionParticipants per Kafka Topic
private final Map<String, ConcurrentLinkedQueue<TransactionParticipant>> partitionWorkers;
private final KafkaControlProducer producer;
private KafkaConsumer<String, ControlEvent> consumer;
public KafkaConnectControlAgent(String bootstrapServers,
String controlTopicName) {
this.bootstrapServers = bootstrapServers;
this.controlTopicName = controlTopicName;
this.executorService = Executors.newSingleThreadExecutor();
this.topicCoordinators = new HashMap<>();
this.partitionWorkers = new HashMap<>();
this.producer = new KafkaControlProducer(bootstrapServers, controlTopicName);
start();
}
public static KafkaConnectControlAgent createKafkaControlManager(String bootstrapServers,
String controlTopicName) {
if (agent == null) {
synchronized (LOCK) {
if (agent == null) {
agent = new KafkaConnectControlAgent(bootstrapServers, controlTopicName);
}
}
}
return agent;
}
@Override
public void registerTransactionParticipant(TransactionParticipant worker) {
if (!partitionWorkers.containsKey(worker.getPartition().topic())) {
partitionWorkers.put(worker.getPartition().topic(), new ConcurrentLinkedQueue<>());
}
partitionWorkers.get(worker.getPartition().topic()).add(worker);
}
@Override
public void deregisterTransactionParticipant(TransactionParticipant worker) {
if (partitionWorkers.containsKey(worker.getPartition().topic())) {
partitionWorkers.get(worker.getPartition().topic()).remove(worker);
}
}
@Override
public void registerTransactionCoordinator(TransactionCoordinator coordinator) {
if (!topicCoordinators.containsKey(coordinator.getPartition().topic())) {
topicCoordinators.put(coordinator.getPartition().topic(), coordinator);
}
}
public void deregisterTransactionCoordinator(TransactionCoordinator coordinator) {
topicCoordinators.remove(coordinator.getPartition().topic());
}
@Override
public void publishMessage(ControlEvent message) {
producer.publishMessage(message);
}
private void start() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Todo fetch the worker id or name instead of a uuid.
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hudi-control-group" + UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
// Since we are using Kafka Control Topic as a RPC like interface,
// we want consumers to only process messages that are sent after they come online
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumer = new KafkaConsumer<>(props, new StringDeserializer(),
new KafkaJsonDeserializer<>(ControlEvent.class));
consumer.subscribe(Collections.singletonList(controlTopicName));
executorService.submit(() -> {
while (true) {
ConsumerRecords<String, ControlEvent> records;
records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
for (ConsumerRecord<String, ControlEvent> record : records) {
try {
LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s",
"", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
ControlEvent message = record.value();
String senderTopic = message.senderPartition().topic();
if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
if (partitionWorkers.containsKey(senderTopic)) {
for (TransactionParticipant partitionWorker : partitionWorkers.get(senderTopic)) {
partitionWorker.processControlEvent(message);
}
} else {
LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", senderTopic));
}
} else if (message.getSenderType().equals(ControlEvent.SenderType.PARTICIPANT)) {
if (topicCoordinators.containsKey(senderTopic)) {
topicCoordinators.get(senderTopic).processControlEvent(message);
} else {
LOG.warn(String.format("Failed to send message for unregistered coordinator for topic %s", senderTopic));
}
} else {
LOG.warn(String.format("Sender type of Control Message unknown %s", message.getSenderType().name()));
}
} catch (Exception e) {
LOG.error(String.format("Fatal error while consuming a kafka record for topic = %s partition = %s", record.topic(), record.partition()), e);
}
}
try {
consumer.commitSync();
} catch (CommitFailedException exception) {
LOG.error("Fatal error while committing kafka control topic");
}
}
});
}
public void stop() {
producer.stop();
consumer.close();
if (executorService != null) {
boolean terminated = false;
try {
LOG.info("Shutting down executor service.");
executorService.shutdown();
LOG.info("Awaiting termination.");
terminated = executorService.awaitTermination(EXEC_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}
if (!terminated) {
LOG.warn(
"Unclean Kafka Control Manager executor service shutdown ");
executorService.shutdownNow();
}
}
}
/**
* Deserializes the incoming Kafka records for the Control Topic.
*
* @param <T> represents the object that is sent over the Control Topic.
*/
public static class KafkaJsonDeserializer<T> implements Deserializer<T> {
private static final Logger LOG = LogManager.getLogger(KafkaJsonDeserializer.class);
private final Class<T> type;
KafkaJsonDeserializer(Class<T> type) {
this.type = type;
}
@Override
public T deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
T obj = null;
try {
obj = mapper.readValue(bytes, type);
} catch (Exception e) {
LOG.error(e.getMessage());
}
return obj;
}
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
/**
* Manages the Kafka consumer and producer for
* the Kafka Control Topic that ensures coordination across the
* {@link TransactionCoordinator} and {@link TransactionParticipant}s.
*/
public interface KafkaControlAgent {
void registerTransactionParticipant(TransactionParticipant worker);
void deregisterTransactionParticipant(TransactionParticipant worker);
void registerTransactionCoordinator(TransactionCoordinator coordinator);
void deregisterTransactionCoordinator(TransactionCoordinator coordinator);
void publishMessage(ControlEvent message);
}

View File

@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.transaction.ControlEvent;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Properties;
/**
* Kafka producer to send events to the
* Control Topic that coordinates transactions
* across Participants.
*/
public class KafkaControlProducer {
private static final Logger LOG = LogManager.getLogger(KafkaControlProducer.class);
private final String bootstrapServers;
private final String controlTopicName;
private Producer<String, ControlEvent> producer;
public KafkaControlProducer(String bootstrapServers, String controlTopicName) {
this.bootstrapServers = bootstrapServers;
this.controlTopicName = controlTopicName;
start();
}
private void start() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
producer = new KafkaProducer<>(
props,
new StringSerializer(),
new KafkaJsonSerializer()
);
}
public void stop() {
producer.close();
}
public void publishMessage(ControlEvent message) {
ProducerRecord<String, ControlEvent> record
= new ProducerRecord<>(controlTopicName, message.key(), message);
producer.send(record);
}
public static class KafkaJsonSerializer implements Serializer<ControlEvent> {
private static final Logger LOG = LogManager.getLogger(KafkaJsonSerializer.class);
@Override
public byte[] serialize(String topic, ControlEvent data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
retVal = objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
LOG.error("Fatal error during serialization of Kafka Control Message ", e);
}
return retVal;
}
}
}

View File

@@ -0,0 +1,399 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.connect.writers.ConnectTransactionServices;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
import org.apache.hudi.exception.HoodieException;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* Implementation of the Coordinator that
* coordinates the Hudi write transactions
* across all the Kafka partitions for a single Kafka Topic.
*/
public class ConnectTransactionCoordinator implements TransactionCoordinator, Runnable {
private static final Logger LOG = LogManager.getLogger(ConnectTransactionCoordinator.class);
private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
private static final String KAFKA_OFFSET_DELIMITER = ",";
private static final String KAFKA_OFFSET_KV_DELIMITER = "=";
private static final Long START_COMMIT_INIT_DELAY_MS = 100L;
private static final Long RESTART_COMMIT_DELAY_MS = 500L;
private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
private final KafkaConnectConfigs configs;
private final TopicPartition partition;
private final KafkaControlAgent kafkaControlClient;
private final ConnectTransactionServices transactionServices;
private final KafkaPartitionProvider partitionProvider;
private final Map<Integer, List<WriteStatus>> partitionsWriteStatusReceived;
private final Map<Integer, Long> currentConsumedKafkaOffsets;
private final AtomicBoolean hasStarted = new AtomicBoolean(false);
private final BlockingQueue<CoordinatorEvent> events;
private final ExecutorService executorService;
private final ScheduledExecutorService scheduler;
private String currentCommitTime;
private Map<Integer, Long> globalCommittedKafkaOffsets;
private State currentState;
private int numPartitions;
public ConnectTransactionCoordinator(KafkaConnectConfigs configs,
TopicPartition partition,
KafkaControlAgent kafkaControlClient) throws HoodieException {
this(configs,
partition,
kafkaControlClient,
new KafkaConnectTransactionServices(configs),
KafkaConnectUtils::getLatestNumPartitions);
}
public ConnectTransactionCoordinator(KafkaConnectConfigs configs,
TopicPartition partition,
KafkaControlAgent kafkaControlClient,
ConnectTransactionServices transactionServices,
KafkaPartitionProvider partitionProvider) {
this.configs = configs;
this.partition = partition;
this.kafkaControlClient = kafkaControlClient;
this.transactionServices = transactionServices;
this.partitionProvider = partitionProvider;
this.events = new LinkedBlockingQueue<>();
scheduler = Executors.newSingleThreadScheduledExecutor();
executorService = Executors.newSingleThreadExecutor();
this.currentCommitTime = StringUtils.EMPTY_STRING;
this.partitionsWriteStatusReceived = new HashMap<>();
this.globalCommittedKafkaOffsets = new HashMap<>();
this.currentConsumedKafkaOffsets = new HashMap<>();
this.currentState = State.INIT;
}
@Override
public void start() {
if (hasStarted.compareAndSet(false, true)) {
executorService.submit(this);
}
kafkaControlClient.registerTransactionCoordinator(this);
LOG.info(String.format("Start Transaction Coordinator for topic %s partition %s",
partition.topic(), partition.partition()));
initializeGlobalCommittedKafkaOffsets();
// Submit the first start commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
}
@Override
public void stop() {
kafkaControlClient.deregisterTransactionCoordinator(this);
hasStarted.set(false);
if (executorService != null) {
boolean terminated = false;
try {
LOG.info("Shutting down executor service.");
executorService.shutdown();
LOG.info("Awaiting termination.");
terminated = executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}
if (!terminated) {
LOG.warn(
"Unclean Kafka Control Manager executor service shutdown ");
executorService.shutdownNow();
}
}
}
@Override
public TopicPartition getPartition() {
return partition;
}
@Override
public void processControlEvent(ControlEvent message) {
CoordinatorEvent.CoordinatorEventType type;
if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
} else {
LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", message.getMsgType().name()));
return;
}
CoordinatorEvent event = new CoordinatorEvent(type,
message.senderPartition().topic(),
message.getCommitTime());
event.setMessage(message);
submitEvent(event);
}
@Override
public void run() {
while (true) {
try {
CoordinatorEvent event = events.poll(COORDINATOR_EVENT_LOOP_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (event != null) {
processCoordinatorEvent(event);
}
} catch (InterruptedException exception) {
LOG.warn("Error received while polling the event loop in Partition Coordinator", exception);
}
}
}
private void submitEvent(CoordinatorEvent event) {
this.submitEvent(event, 0, TimeUnit.SECONDS);
}
private void submitEvent(CoordinatorEvent event, long delay, TimeUnit unit) {
scheduler.schedule(() -> {
events.add(event);
}, delay, unit);
}
private void processCoordinatorEvent(CoordinatorEvent event) {
try {
// Ignore NULL and STALE events, unless its one to start a new COMMIT
if (event == null
|| (!event.getEventType().equals(CoordinatorEvent.CoordinatorEventType.START_COMMIT)
&& (!event.getCommitTime().equals(currentCommitTime)))) {
return;
}
switch (event.getEventType()) {
case START_COMMIT:
startNewCommit();
break;
case END_COMMIT:
endExistingCommit();
break;
case WRITE_STATUS:
// Ignore stale write_status messages sent after
if (event.getMessage() != null
&& currentState.equals(State.ENDED_COMMIT)) {
onReceiveWriteStatus(event.getMessage());
} else {
LOG.warn("Could not process WRITE_STATUS due to missing message");
}
break;
case ACK_COMMIT:
submitAckCommit();
break;
case WRITE_STATUS_TIMEOUT:
handleWriteStatusTimeout();
break;
default:
throw new IllegalStateException("Partition Coordinator has received an illegal event type " + event.getEventType().name());
}
} catch (Exception exception) {
LOG.warn("Error received while polling the event loop in Partition Coordinator", exception);
}
}
private void startNewCommit() {
numPartitions = partitionProvider.getLatestNumPartitions(configs.getString(BOOTSTRAP_SERVERS_CFG), partition.topic());
partitionsWriteStatusReceived.clear();
try {
currentCommitTime = transactionServices.startCommit();
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.START_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(
new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
currentState = State.STARTED_COMMIT;
// schedule a timeout for ending the current commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT,
partition.topic(),
currentCommitTime),
configs.getCommitIntervalSecs(), TimeUnit.SECONDS);
} catch (Exception exception) {
LOG.error(String.format("Failed to start a new commit %s, will retry", currentCommitTime), exception);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
}
}
private void endExistingCommit() {
try {
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.END_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
} catch (Exception exception) {
LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
currentConsumedKafkaOffsets.clear();
currentState = State.ENDED_COMMIT;
// schedule a timeout for receiving all write statuses
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.WRITE_STATUS_TIMEOUT,
partition.topic(),
currentCommitTime),
configs.getCoordinatorWriteTimeoutSecs(), TimeUnit.SECONDS);
}
private void onReceiveWriteStatus(ControlEvent message) {
ControlEvent.ParticipantInfo participantInfo = message.getParticipantInfo();
if (participantInfo.getOutcomeType().equals(ControlEvent.OutcomeType.WRITE_SUCCESS)) {
int partition = message.senderPartition().partition();
partitionsWriteStatusReceived.put(partition, participantInfo.writeStatuses());
currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaCommitOffset());
}
if (partitionsWriteStatusReceived.size() >= numPartitions
&& currentState.equals(State.ENDED_COMMIT)) {
// Commit the kafka offsets to the commit file
try {
List<WriteStatus> allWriteStatuses = new ArrayList<>();
partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll(value));
// Commit the last write in Hudi, along with the latest kafka offset
if (!allWriteStatuses.isEmpty()) {
transactionServices.endCommit(currentCommitTime,
allWriteStatuses,
transformKafkaOffsets(currentConsumedKafkaOffsets));
}
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
partition.topic(),
currentCommitTime));
} catch (Exception exception) {
LOG.error("Fatal error while committing file", exception);
}
}
}
private void handleWriteStatusTimeout() {
// If we are still stuck in ENDED_STATE
if (currentState.equals(State.ENDED_COMMIT)) {
currentState = State.WRITE_STATUS_TIMEDOUT;
LOG.warn("Did not receive the Write Status from all partitions");
// Submit the next start commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
}
}
private void submitAckCommit() {
try {
ControlEvent message = new ControlEvent.Builder(
ControlEvent.MsgType.ACK_COMMIT,
ControlEvent.SenderType.COORDINATOR,
currentCommitTime,
partition)
.setCoordinatorInfo(
new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
.build();
kafkaControlClient.publishMessage(message);
} catch (Exception exception) {
LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
}
currentState = State.ACKED_COMMIT;
// Submit the next start commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
}
private void initializeGlobalCommittedKafkaOffsets() {
try {
Map<String, String> commitMetadata = transactionServices.fetchLatestExtraCommitMetadata();
String latestKafkaOffsets = commitMetadata.get(KAFKA_OFFSET_KEY);
if (!StringUtils.isNullOrEmpty(latestKafkaOffsets)) {
LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " + latestKafkaOffsets);
globalCommittedKafkaOffsets = Arrays.stream(latestKafkaOffsets.split(KAFKA_OFFSET_DELIMITER))
.map(entry -> entry.split(KAFKA_OFFSET_KV_DELIMITER))
.collect(Collectors.toMap(entry -> Integer.parseInt(entry[0]), entry -> Long.parseLong(entry[1])));
LOG.info("Initialized the kafka offset commits " + globalCommittedKafkaOffsets);
}
} catch (Exception exception) {
throw new HoodieException("Could not deserialize the kafka commit offsets", exception);
}
}
private Map<String, String> transformKafkaOffsets(Map<Integer, Long> kafkaOffsets) {
try {
String kafkaOffsetValue = kafkaOffsets.keySet().stream()
.map(key -> key + KAFKA_OFFSET_KV_DELIMITER + kafkaOffsets.get(key))
.collect(Collectors.joining(KAFKA_OFFSET_DELIMITER));
return Collections.singletonMap(KAFKA_OFFSET_KEY, kafkaOffsetValue);
} catch (Exception exception) {
throw new HoodieException("Could not serialize the kafka commit offsets", exception);
}
}
private enum State {
INIT,
STARTED_COMMIT,
ENDED_COMMIT,
WRITE_STATUS_RCVD,
WRITE_STATUS_TIMEDOUT,
ACKED_COMMIT,
}
/**
* Provides the current partitions of a Kafka Topic dynamically.
*/
public interface KafkaPartitionProvider {
int getLatestNumPartitions(String bootstrapServers, String topicName);
}
}

View File

@@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Implementation of the {@link TransactionParticipant} that coordinates the Hudi write transactions
* based on events from the {@link TransactionCoordinator} and manages the Hudi Writes for a specific Kafka Partition.
*/
public class ConnectTransactionParticipant implements TransactionParticipant {
private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
private final LinkedList<SinkRecord> buffer;
private final BlockingQueue<ControlEvent> controlEvents;
private final TopicPartition partition;
private final SinkTaskContext context;
private final KafkaControlAgent kafkaControlAgent;
private final ConnectWriterProvider<WriteStatus> writerProvider;
private TransactionInfo<WriteStatus> ongoingTransactionInfo;
private long committedKafkaOffset;
public ConnectTransactionParticipant(KafkaConnectConfigs configs,
TopicPartition partition,
KafkaControlAgent kafkaControlAgent,
SinkTaskContext context) throws HoodieException {
this(partition, kafkaControlAgent, context, new KafkaConnectWriterProvider(configs, partition));
}
public ConnectTransactionParticipant(TopicPartition partition,
KafkaControlAgent kafkaControlAgent,
SinkTaskContext context,
ConnectWriterProvider<WriteStatus> writerProvider) throws HoodieException {
this.buffer = new LinkedList<>();
this.controlEvents = new LinkedBlockingQueue<>();
this.partition = partition;
this.context = context;
this.writerProvider = writerProvider;
this.kafkaControlAgent = kafkaControlAgent;
this.ongoingTransactionInfo = null;
this.committedKafkaOffset = 0;
}
@Override
public void start() {
LOG.info("Start Hudi Transaction Participant for partition " + partition.partition());
this.kafkaControlAgent.registerTransactionParticipant(this);
context.pause(partition);
}
@Override
public void stop() {
this.kafkaControlAgent.deregisterTransactionParticipant(this);
cleanupOngoingTransaction();
}
@Override
public void buffer(SinkRecord record) {
buffer.add(record);
}
@Override
public void processControlEvent(ControlEvent message) {
controlEvents.add(message);
}
@Override
public long getLastKafkaCommittedOffset() {
return committedKafkaOffset;
}
@Override
public TopicPartition getPartition() {
return partition;
}
@Override
public void processRecords() {
while (!controlEvents.isEmpty()) {
ControlEvent message = controlEvents.poll();
switch (message.getMsgType()) {
case START_COMMIT:
handleStartCommit(message);
break;
case END_COMMIT:
handleEndCommit(message);
break;
case ACK_COMMIT:
handleAckCommit(message);
break;
case WRITE_STATUS:
// ignore write status since its only processed by leader
break;
default:
throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getMsgType());
}
}
writeRecords();
}
private void handleStartCommit(ControlEvent message) {
// If there is an existing/ongoing transaction locally
// but it failed globally since we received another START_COMMIT instead of an END_COMMIT or ACK_COMMIT,
// so close it and start new transaction
cleanupOngoingTransaction();
// Resync the last committed Kafka offset from the leader
syncKafkaOffsetWithLeader(message);
context.resume(partition);
String currentCommitTime = message.getCommitTime();
LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime);
try {
ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime));
ongoingTransactionInfo.setLastWrittenKafkaOffset(committedKafkaOffset);
} catch (Exception exception) {
LOG.warn("Error received while starting a new transaction", exception);
}
}
private void handleEndCommit(ControlEvent message) {
if (ongoingTransactionInfo == null) {
LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
return;
} else if (!ongoingTransactionInfo.getCommitTime().equals(message.getCommitTime())) {
LOG.error(String.format("Fatal error received END_COMMIT with commit time %s while local transaction commit time %s",
message.getCommitTime(), ongoingTransactionInfo.getCommitTime()));
// Recovery: A new END_COMMIT from leader caused interruption to an existing transaction,
// explicitly reset Kafka commit offset to ensure no data loss
cleanupOngoingTransaction();
syncKafkaOffsetWithLeader(message);
return;
}
// send Writer Status Message and wait for ACK_COMMIT in async fashion
try {
context.pause(partition);
ongoingTransactionInfo.commitInitiated();
//sendWriterStatus
List<WriteStatus> writeStatuses = new ArrayList<>();
try {
writeStatuses = ongoingTransactionInfo.getWriter().close();
} catch (IOException exception) {
LOG.warn("Error closing the Hudi Writer", exception);
}
ControlEvent writeStatus = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
.setParticipantInfo(new ControlEvent.ParticipantInfo(
writeStatuses,
ongoingTransactionInfo.getLastWrittenKafkaOffset(),
ControlEvent.OutcomeType.WRITE_SUCCESS))
.build();
kafkaControlAgent.publishMessage(writeStatus);
} catch (Exception exception) {
LOG.warn(String.format("Error ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
}
}
private void handleAckCommit(ControlEvent message) {
// Update lastKafkCommitedOffset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset();
}
syncKafkaOffsetWithLeader(message);
cleanupOngoingTransaction();
}
private void writeRecords() {
if (ongoingTransactionInfo != null && !ongoingTransactionInfo.isCommitInitiated()) {
while (!buffer.isEmpty()) {
try {
SinkRecord record = buffer.peek();
if (record != null
&& record.kafkaOffset() >= ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
ongoingTransactionInfo.getWriter().writeRecord(record);
ongoingTransactionInfo.setLastWrittenKafkaOffset(record.kafkaOffset() + 1);
} else if (record != null && record.kafkaOffset() < committedKafkaOffset) {
LOG.warn(String.format("Received a kafka record with offset %s prior to last committed offset %s for partition %s",
record.kafkaOffset(), ongoingTransactionInfo.getLastWrittenKafkaOffset(),
partition));
}
buffer.poll();
} catch (Exception exception) {
LOG.warn(String.format("Error received while writing records for transaction %s in partition %s",
ongoingTransactionInfo.getCommitTime(), partition.partition()),
exception);
}
}
}
}
private void cleanupOngoingTransaction() {
if (ongoingTransactionInfo != null) {
try {
ongoingTransactionInfo.getWriter().close();
ongoingTransactionInfo = null;
} catch (IOException exception) {
LOG.warn("Error received while trying to cleanup existing transaction", exception);
}
}
}
private void syncKafkaOffsetWithLeader(ControlEvent message) {
if (message.getCoordinatorInfo() != null) {
Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsets().get(partition.partition());
// Recover kafka committed offsets, treating the commit offset from the coordinator
// as the source of truth
if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {
if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
LOG.warn(String.format("Recovering the kafka offset for partition %s to offset %s instead of local offset %s",
partition.partition(), coordinatorCommittedKafkaOffset, committedKafkaOffset));
context.offset(partition, coordinatorCommittedKafkaOffset);
}
committedKafkaOffset = coordinatorCommittedKafkaOffset;
}
}
}
}

View File

@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* The events sent over the Kafka Control Topic between the
* coordinator and the followers, in order to ensure
* coordination across all the writes.
*/
@SuppressWarnings("checkstyle:VisibilityModifier")
public class ControlEvent implements Serializable {
private static final Logger LOG = LogManager.getLogger(ControlEvent.class);
private static final int CURRENT_VERSION = 0;
private final int version = CURRENT_VERSION;
private MsgType msgType;
private SenderType senderType;
private String commitTime;
private byte[] senderPartition;
private CoordinatorInfo coordinatorInfo;
private ParticipantInfo participantInfo;
public ControlEvent() {
}
public ControlEvent(MsgType msgType,
SenderType senderType,
String commitTime,
byte[] senderPartition,
CoordinatorInfo coordinatorInfo,
ParticipantInfo participantInfo) {
this.msgType = msgType;
this.senderType = senderType;
this.commitTime = commitTime;
this.senderPartition = senderPartition;
this.coordinatorInfo = coordinatorInfo;
this.participantInfo = participantInfo;
}
public String key() {
return msgType.name().toLowerCase(Locale.ROOT);
}
public MsgType getMsgType() {
return msgType;
}
public SenderType getSenderType() {
return senderType;
}
public String getCommitTime() {
return commitTime;
}
public byte[] getSenderPartition() {
return senderPartition;
}
public TopicPartition senderPartition() {
return SerializationUtils.deserialize(senderPartition);
}
public CoordinatorInfo getCoordinatorInfo() {
return coordinatorInfo;
}
public ParticipantInfo getParticipantInfo() {
return participantInfo;
}
public int getVersion() {
return version;
}
@Override
public String toString() {
return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
Arrays.toString(senderPartition), coordinatorInfo.toString(), participantInfo.toString());
}
/**
* Builder that helps build {@link ControlEvent}.
*/
public static class Builder {
private final MsgType msgType;
private SenderType senderType;
private final String commitTime;
private final byte[] senderPartition;
private CoordinatorInfo coordinatorInfo;
private ParticipantInfo participantInfo;
public Builder(MsgType msgType, SenderType senderType, String commitTime, TopicPartition senderPartition) throws IOException {
this.msgType = msgType;
this.senderType = senderType;
this.commitTime = commitTime;
this.senderPartition = SerializationUtils.serialize(senderPartition);
}
public Builder setCoordinatorInfo(CoordinatorInfo coordinatorInfo) {
this.coordinatorInfo = coordinatorInfo;
return this;
}
public Builder setParticipantInfo(ParticipantInfo participantInfo) {
this.participantInfo = participantInfo;
return this;
}
public ControlEvent build() {
return new ControlEvent(msgType, senderType, commitTime, senderPartition, coordinatorInfo, participantInfo);
}
}
/**
* The info sent by the {@link TransactionCoordinator} to one or more
* {@link TransactionParticipant}s.
*/
public static class CoordinatorInfo implements Serializable {
private Map<Integer, Long> globalKafkaCommitOffsets;
public CoordinatorInfo() {
}
public CoordinatorInfo(Map<Integer, Long> globalKafkaCommitOffsets) {
this.globalKafkaCommitOffsets = globalKafkaCommitOffsets;
}
public Map<Integer, Long> getGlobalKafkaCommitOffsets() {
return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
}
}
/**
* The info sent by a {@link TransactionParticipant} instances to the
* {@link TransactionCoordinator}.
*/
public static class ParticipantInfo implements Serializable {
private byte[] writeStatusList;
private long kafkaCommitOffset;
private OutcomeType outcomeType;
public ParticipantInfo() {
}
public ParticipantInfo(List<WriteStatus> writeStatuses, long kafkaCommitOffset, OutcomeType outcomeType) throws IOException {
this.writeStatusList = SerializationUtils.serialize(writeStatuses);
this.kafkaCommitOffset = kafkaCommitOffset;
this.outcomeType = outcomeType;
}
public byte[] getWriteStatusList() {
return writeStatusList;
}
public List<WriteStatus> writeStatuses() {
return SerializationUtils.deserialize(writeStatusList);
}
public long getKafkaCommitOffset() {
return kafkaCommitOffset;
}
public OutcomeType getOutcomeType() {
return outcomeType;
}
}
/**
* Type of Control Event.
*/
public enum MsgType {
START_COMMIT,
END_COMMIT,
ACK_COMMIT,
WRITE_STATUS,
}
public enum SenderType {
COORDINATOR,
PARTICIPANT
}
public enum OutcomeType {
WRITE_SUCCESS,
}
}

View File

@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
/**
* The events within the Coordinator that trigger
* the state changes in the state machine of
* the Coordinator.
*/
public class CoordinatorEvent {
private final CoordinatorEventType eventType;
private final String topicName;
private final String commitTime;
private ControlEvent message;
public CoordinatorEvent(CoordinatorEventType eventType,
String topicName,
String commitTime) {
this.eventType = eventType;
this.topicName = topicName;
this.commitTime = commitTime;
}
public CoordinatorEventType getEventType() {
return eventType;
}
public String getTopicName() {
return topicName;
}
public String getCommitTime() {
return commitTime;
}
public ControlEvent getMessage() {
return message;
}
public void setMessage(ControlEvent message) {
this.message = message;
}
/**
* The type of Coordinator Event.
*/
public enum CoordinatorEventType {
START_COMMIT,
END_COMMIT,
WRITE_STATUS,
ACK_COMMIT,
WRITE_STATUS_TIMEOUT
}
}

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.kafka.common.TopicPartition;
/**
* The Base Coordinator that
* coordinates the write transactions
* across all the Kafka partitions, that
* are managed by the {@link TransactionParticipant}.
*/
public interface TransactionCoordinator {
void start();
void stop();
/* Kafka Topic that this Coordinator belongs to */
TopicPartition getPartition();
/* Called when a control event is received from the Kafka control topic */
void processControlEvent(ControlEvent message);
}

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.writers.ConnectWriter;
/**
* Stores all the state for the current Transaction within a
* {@link TransactionParticipant}.
* @param <T> The type of status returned by the underlying writer.
*/
public class TransactionInfo<T> {
private final String commitTime;
private final ConnectWriter<T> writer;
private long lastWrittenKafkaOffset;
private boolean commitInitiated;
public TransactionInfo(String commitTime, ConnectWriter<T> writer) {
this.commitTime = commitTime;
this.writer = writer;
this.lastWrittenKafkaOffset = 0;
this.commitInitiated = false;
}
public String getCommitTime() {
return commitTime;
}
public ConnectWriter<T> getWriter() {
return writer;
}
public long getLastWrittenKafkaOffset() {
return lastWrittenKafkaOffset;
}
public boolean isCommitInitiated() {
return commitInitiated;
}
public void setLastWrittenKafkaOffset(long lastWrittenKafkaOffset) {
this.lastWrittenKafkaOffset = lastWrittenKafkaOffset;
}
public void commitInitiated() {
this.commitInitiated = true;
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.transaction;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
/**
* Interface for the Participant that
* manages Writes for a
* single Kafka partition, based on
* coordination signals from the {@link TransactionCoordinator}.
*/
public interface TransactionParticipant {
void start();
void stop();
void buffer(SinkRecord record);
void processRecords();
TopicPartition getPartition();
void processControlEvent(ControlEvent message);
long getLastKafkaCommittedOffset();
}

View File

@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.utils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* Helper methods for Kafka.
*/
public class KafkaConnectUtils {
private static final Logger LOG = LogManager.getLogger(KafkaConnectUtils.class);
public static int getLatestNumPartitions(String bootstrapServers, String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
try {
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList(topicName));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get(topicName);
int numPartitions = topicDescription.get().partitions().size();
LOG.info(String.format("Latest number of partitions for topic %s is %s", topicName, numPartitions));
return numPartitions;
} catch (Exception exception) {
throw new HoodieException("Fatal error fetching the latest partition of kafka topic name" + topicName, exception);
}
}
/**
* Returns the default Hadoop Configuration.
* @return
*/
public static Configuration getDefaultHadoopConf() {
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
return hadoopConf;
}
/**
* Extract the record fields.
* @param keyGenerator key generator Instance of the keygenerator.
* @return Returns the record key columns seprarated by comma.
*/
public static String getRecordKeyColumns(KeyGenerator keyGenerator) {
return String.join(",", keyGenerator.getRecordKeyFieldNames());
}
/**
* Extract partition columns directly if an instance of class {@link BaseKeyGenerator},
* else extract partition columns from the properties.
*
* @param keyGenerator key generator Instance of the keygenerator.
* @param typedProperties properties from the config.
* @return partition columns Returns the partition columns seprarated by comma.
*/
public static String getPartitionColumns(KeyGenerator keyGenerator, TypedProperties typedProperties) {
if (keyGenerator instanceof CustomKeyGenerator || keyGenerator instanceof CustomAvroKeyGenerator) {
return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map(
pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX))
.findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.collect(Collectors.joining(","));
}
if (keyGenerator instanceof BaseKeyGenerator) {
return String.join(",", ((BaseKeyGenerator) keyGenerator).getPartitionPathFields());
}
return typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
}
/**
* Get the Metadata from the latest commit file.
*
* @param metaClient The {@link HoodieTableMetaClient} to get access to the meta data.
* @return An Optional {@link HoodieCommitMetadata} containing the meta data from the latest commit file.
*/
public static Option<HoodieCommitMetadata> getCommitMetadataForLatestInstant(HoodieTableMetaClient metaClient) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.filter(instant -> (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE && instant.getAction().equals(HoodieActiveTimeline.COMMIT_ACTION))
|| (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && instant.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
);
Option<HoodieInstant> latestInstant = timeline.lastInstant();
if (latestInstant.isPresent()) {
try {
byte[] data = timeline.getInstantDetails(latestInstant.get()).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
} else {
return Option.empty();
}
}
}

View File

@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* Base Hudi Writer that manages reading the raw Kafka records and
* converting them to {@link HoodieRecord}s that can be written to Hudi by
* the derived implementations of this class.
*/
public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus> {
public static final String KAFKA_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
public static final String KAFKA_JSON_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
public static final String KAFKA_STRING_CONVERTER = "org.apache.kafka.connect.storage.StringConverter";
private static final Logger LOG = LogManager.getLogger(AbstractConnectWriter.class);
private final KafkaConnectConfigs connectConfigs;
private final KeyGenerator keyGenerator;
private final SchemaProvider schemaProvider;
public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
KeyGenerator keyGenerator,
SchemaProvider schemaProvider) {
this.connectConfigs = connectConfigs;
this.keyGenerator = keyGenerator;
this.schemaProvider = schemaProvider;
}
@Override
public void writeRecord(SinkRecord record) throws IOException {
AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
Option<GenericRecord> avroRecord;
switch (connectConfigs.getKafkaValueConverter()) {
case KAFKA_AVRO_CONVERTER:
avroRecord = Option.of((GenericRecord) record.value());
break;
case KAFKA_STRING_CONVERTER:
avroRecord = Option.of(convertor.fromJson((String) record.value()));
break;
case KAFKA_JSON_CONVERTER:
throw new UnsupportedEncodingException("Currently JSON objects are not supported");
default:
throw new IOException("Unsupported Kafka Format type (" + connectConfigs.getKafkaValueConverter() + ")");
}
HoodieRecord hoodieRecord = new HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord));
writeHudiRecord(hoodieRecord);
}
@Override
public List<WriteStatus> close() {
return flushHudiRecords();
}
protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record);
protected abstract List<WriteStatus> flushHudiRecords();
}

View File

@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Specific implementation of a Hudi Writer that buffers all incoming records,
* and writes them to Hudi files on the end of a transaction using Bulk Insert.
*/
public class BufferedConnectWriter extends AbstractConnectWriter {
private static final Logger LOG = LogManager.getLogger(BufferedConnectWriter.class);
private final HoodieEngineContext context;
private final HoodieJavaWriteClient writeClient;
private final String instantTime;
private final HoodieWriteConfig config;
private ExternalSpillableMap<String, HoodieRecord<HoodieAvroPayload>> bufferedRecords;
public BufferedConnectWriter(HoodieEngineContext context,
HoodieJavaWriteClient writeClient,
String instantTime,
KafkaConnectConfigs connectConfigs,
HoodieWriteConfig config,
KeyGenerator keyGenerator,
SchemaProvider schemaProvider) {
super(connectConfigs, keyGenerator, schemaProvider);
this.context = context;
this.writeClient = writeClient;
this.instantTime = instantTime;
this.config = config;
init();
}
private void init() {
try {
// Load and batch all incoming records in a map
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(context.getTaskContextSupplier(), config);
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.bufferedRecords = new ExternalSpillableMap<>(memoryForMerge,
config.getSpillableMapBasePath(),
new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(new Schema.Parser().parse(config.getSchema())),
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
}
@Override
public void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
bufferedRecords.put(record.getRecordKey(), record);
}
@Override
public List<WriteStatus> flushHudiRecords() {
try {
LOG.info("Number of entries in MemoryBasedMap => "
+ bufferedRecords.getInMemoryMapNumEntries()
+ "Total size in bytes of MemoryBasedMap => "
+ bufferedRecords.getCurrentInMemoryMapSize() + "Number of entries in BitCaskDiskMap => "
+ bufferedRecords.getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+ bufferedRecords.getSizeOfFileOnDiskInBytes());
List<WriteStatus> writeStatuses = new ArrayList<>();
// Write out all records if non-empty
if (!bufferedRecords.isEmpty()) {
writeStatuses = writeClient.bulkInsertPreppedRecords(
bufferedRecords.values().stream().collect(Collectors.toList()),
instantTime, Option.empty());
}
bufferedRecords.close();
LOG.info("Flushed hudi records and got writeStatuses: "
+ writeStatuses);
return writeStatuses;
} catch (Exception e) {
throw new HoodieException("Write records failed", e);
}
}
}

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import java.util.List;
import java.util.Map;
/**
* Transaction service APIs used by
* {@link TransactionCoordinator}.
*/
public interface ConnectTransactionServices {
String startCommit();
void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);
Map<String, String> fetchLatestExtraCommitMetadata();
}

View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.kafka.connect.sink.SinkRecord;
import java.io.IOException;
import java.util.List;
public interface ConnectWriter<T> {
void writeRecord(SinkRecord record) throws IOException;
List<T> close() throws IOException;
}

View File

@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
public interface ConnectWriterProvider<T> {
ConnectWriter<T> getWriter(String commitTime);
}

View File

@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import javax.annotation.concurrent.Immutable;
import java.util.Map;
import java.util.Properties;
/**
* Class storing configs for the HoodieWriteClient.
*/
@Immutable
@ConfigClassProperty(name = "Kafka Sink Connect Configurations",
groupName = ConfigGroups.Names.KAFKA_CONNECT,
description = "Configurations for Kakfa Connect Sink Connector for Hudi.")
public class KafkaConnectConfigs extends HoodieConfig {
public static final String KAFKA_VALUE_CONVERTER = "value.converter";
public static final ConfigProperty<String> KAFKA_BOOTSTRAP_SERVERS = ConfigProperty
.key("bootstrap.servers")
.defaultValue("localhost:9092")
.withDocumentation("The bootstrap servers for the Kafka Cluster.");
public static final ConfigProperty<String> CONTROL_TOPIC_NAME = ConfigProperty
.key("hoodie.kafka.control.topic")
.defaultValue("hudi-control-topic")
.withDocumentation("Kafka topic name used by the Hudi Sink Connector for "
+ "sending and receiving control messages. Not used for data records.");
public static final ConfigProperty<String> SCHEMA_PROVIDER_CLASS = ConfigProperty
.key("hoodie.schemaprovider.class")
.defaultValue(FilebasedSchemaProvider.class.getName())
.withDocumentation("subclass of org.apache.hudi.schema.SchemaProvider "
+ "to attach schemas to input & target table data, built in options: "
+ "org.apache.hudi.schema.FilebasedSchemaProvider.");
public static final ConfigProperty<String> COMMIT_INTERVAL_SECS = ConfigProperty
.key("hoodie.kafka.commit.interval.secs")
.defaultValue("60")
.withDocumentation("The interval at which Hudi will commit the records written "
+ "to the files, making them consumable on the read-side.");
public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS = ConfigProperty
.key("hoodie.kafka.coordinator.write.timeout.secs")
.defaultValue("60")
.withDocumentation("The timeout after sending an END_COMMIT until when "
+ "the coordinator will wait for the write statuses from all the partitions"
+ "to ignore the current commit and start a new commit.");
public static final ConfigProperty<String> META_SYNC_ENABLE = ConfigProperty
.key("hoodie.meta.sync.enable")
.defaultValue("false")
.withDocumentation("Enable Meta Sync such as Hive");
public static final ConfigProperty<String> META_SYNC_CLASSES = ConfigProperty
.key("hoodie.meta.sync.classes")
.defaultValue(HiveSyncTool.class.getName())
.withDocumentation("Meta sync client tool, using comma to separate multi tools");
protected KafkaConnectConfigs() {
super();
}
protected KafkaConnectConfigs(Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
}
public static KafkaConnectConfigs.Builder newBuilder() {
return new KafkaConnectConfigs.Builder();
}
public String getBootstrapServers() {
return getString(KAFKA_BOOTSTRAP_SERVERS);
}
public String getControlTopicName() {
return getString(CONTROL_TOPIC_NAME);
}
public String getSchemaProviderClass() {
return getString(SCHEMA_PROVIDER_CLASS);
}
public Long getCommitIntervalSecs() {
return getLong(COMMIT_INTERVAL_SECS);
}
public Long getCoordinatorWriteTimeoutSecs() {
return getLong(COORDINATOR_WRITE_TIMEOUT_SECS);
}
public String getKafkaValueConverter() {
return getString(KAFKA_VALUE_CONVERTER);
}
public Boolean isMetaSyncEnabled() {
return getBoolean(META_SYNC_ENABLE);
}
public String getMetaSyncClasses() {
return getString(META_SYNC_CLASSES);
}
public static class Builder {
protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
public Builder withBootstrapServers(String bootstrapServers) {
connectConfigs.setValue(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
return this;
}
public Builder withControlTopicName(String controlTopicName) {
connectConfigs.setValue(CONTROL_TOPIC_NAME, controlTopicName);
return this;
}
public Builder withCommitIntervalSecs(Long commitIntervalSecs) {
connectConfigs.setValue(COMMIT_INTERVAL_SECS, String.valueOf(commitIntervalSecs));
return this;
}
public Builder withCoordinatorWriteTimeoutSecs(Long coordinatorWriteTimeoutSecs) {
connectConfigs.setValue(COORDINATOR_WRITE_TIMEOUT_SECS, String.valueOf(coordinatorWriteTimeoutSecs));
return this;
}
// Kafka connect task are passed with props with type Map<>
public Builder withProperties(Map<?, ?> properties) {
connectConfigs.getProps().putAll(properties);
return this;
}
public Builder withProperties(Properties properties) {
connectConfigs.getProps().putAll(properties);
return this;
}
protected void setDefaults() {
// Check for mandatory properties
connectConfigs.setDefaults(KafkaConnectConfigs.class.getName());
}
public KafkaConnectConfigs build() {
setDefaults();
// Build HudiConnectConfigs at the end
return new KafkaConnectConfigs(connectConfigs.getProps());
}
}
}

View File

@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Implementation of Transaction service APIs used by
* {@link TransactionCoordinator}
* using {@link HoodieJavaWriteClient}.
*/
public class KafkaConnectTransactionServices implements ConnectTransactionServices {
private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
private static final String TABLE_FORMAT = "PARQUET";
private final Option<HoodieTableMetaClient> tableMetaClient;
private final Configuration hadoopConf;
private final FileSystem fs;
private final String tableBasePath;
private final String tableName;
private final HoodieEngineContext context;
private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
public KafkaConnectTransactionServices(
KafkaConnectConfigs connectConfigs) throws HoodieException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(connectConfigs.getProps()).build();
tableBasePath = writeConfig.getBasePath();
tableName = writeConfig.getTableName();
hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
context = new HoodieJavaEngineContext(hadoopConf);
fs = FSUtils.getFs(tableBasePath, hadoopConf);
try {
KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps()));
String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator,
new TypedProperties(connectConfigs.getProps()));
LOG.info(String.format("Setting record key %s and partitionfields %s for table %s",
recordKeyFields,
partitionColumns,
tableBasePath + tableName));
tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName())
.setBaseFileFormat(TABLE_FORMAT)
.setRecordKeyFields(recordKeyFields)
.setPartitionFields(partitionColumns)
.setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass())
.initTable(hadoopConf, tableBasePath));
javaClient = new HoodieJavaWriteClient<>(context, writeConfig);
} catch (Exception exception) {
throw new HoodieException("Fatal error instantiating Hudi Transaction Services ", exception);
}
}
public String startCommit() {
String newCommitTime = javaClient.startCommit();
javaClient.transitionInflight(newCommitTime);
LOG.info("Starting Hudi commit " + newCommitTime);
return newCommitTime;
}
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata),
HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap());
LOG.info("Ending Hudi commit " + commitTime);
}
public Map<String, String> fetchLatestExtraCommitMetadata() {
if (tableMetaClient.isPresent()) {
Option<HoodieCommitMetadata> metadata = KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get());
if (metadata.isPresent()) {
return metadata.get().getExtraMetadata();
} else {
LOG.info("Hoodie Extra Metadata from latest commit is absent");
return Collections.emptyMap();
}
}
throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent");
}
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect.writers;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Collections;
/**
* Provides the Hudi Writer for the {@link org.apache.hudi.connect.transaction.TransactionParticipant}
* to write the incoming records to Hudi.
*/
public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteStatus> {
private static final Logger LOG = LogManager.getLogger(KafkaConnectWriterProvider.class);
private final KafkaConnectConfigs connectConfigs;
private final HoodieEngineContext context;
private final HoodieWriteConfig writeConfig;
private final HoodieJavaWriteClient<HoodieAvroPayload> hudiJavaClient;
private final KeyGenerator keyGenerator;
private final SchemaProvider schemaProvider;
public KafkaConnectWriterProvider(
KafkaConnectConfigs connectConfigs,
TopicPartition partition) throws HoodieException {
this.connectConfigs = connectConfigs;
Configuration hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
try {
this.schemaProvider = StringUtils.isNullOrEmpty(connectConfigs.getSchemaProviderClass()) ? null
: (SchemaProvider) ReflectionUtils.loadClass(connectConfigs.getSchemaProviderClass(),
new TypedProperties(connectConfigs.getProps()));
this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps()));
// Create the write client to write some records in
writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(connectConfigs.getProps())
.withFileIdPrefixProviderClassName(KafkaConnectFileIdPrefixProvider.class.getName())
.withProps(Collections.singletonMap(
KafkaConnectFileIdPrefixProvider.KAFKA_CONNECT_PARTITION_ID,
String.valueOf(partition)))
.withSchema(schemaProvider.getSourceSchema().toString())
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.build();
context = new HoodieJavaEngineContext(hadoopConf);
hudiJavaClient = new HoodieJavaWriteClient<>(context, writeConfig);
} catch (Throwable e) {
throw new HoodieException("Fatal error instantiating Hudi Write Provider ", e);
}
}
public AbstractConnectWriter getWriter(String commitTime) {
return new BufferedConnectWriter(
context,
hudiJavaClient,
commitTime,
connectConfigs,
writeConfig,
keyGenerator,
schemaProvider);
}
}