[HUDI-2428] Fix protocol and other issues after stress testing Hudi Kafka Connect (#3656)
* Fixes based on tests and some improvements * Fix the issues after running stress tests * Fixing checkstyle issues and updating README Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local> Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
@@ -30,11 +30,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.errors.RetriableException;
|
||||
import org.apache.kafka.connect.sink.SinkRecord;
|
||||
import org.apache.kafka.connect.sink.SinkTask;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -52,7 +54,7 @@ public class HoodieSinkTask extends SinkTask {
|
||||
private static final int COORDINATOR_KAFKA_PARTITION = 0;
|
||||
|
||||
private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
|
||||
private final Map<TopicPartition, TransactionParticipant> hudiTransactionParticipants;
|
||||
private final Map<TopicPartition, TransactionParticipant> transactionParticipants;
|
||||
private KafkaConnectControlAgent controlKafkaClient;
|
||||
private KafkaConnectConfigs connectConfigs;
|
||||
|
||||
@@ -60,8 +62,8 @@ public class HoodieSinkTask extends SinkTask {
|
||||
private String connectorName;
|
||||
|
||||
public HoodieSinkTask() {
|
||||
transactionCoordinators = new HashMap();
|
||||
hudiTransactionParticipants = new HashMap<>();
|
||||
transactionCoordinators = new HashMap<>();
|
||||
transactionParticipants = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -80,7 +82,6 @@ public class HoodieSinkTask extends SinkTask {
|
||||
controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
|
||||
connectConfigs.getBootstrapServers(),
|
||||
connectConfigs.getControlTopicName());
|
||||
bootstrap(context.assignment());
|
||||
} catch (ConfigException e) {
|
||||
throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
|
||||
} catch (ConnectException e) {
|
||||
@@ -98,11 +99,25 @@ public class HoodieSinkTask extends SinkTask {
|
||||
String topic = record.topic();
|
||||
int partition = record.kafkaPartition();
|
||||
TopicPartition tp = new TopicPartition(topic, partition);
|
||||
hudiTransactionParticipants.get(tp).buffer(record);
|
||||
|
||||
TransactionParticipant transactionParticipant = transactionParticipants.get(tp);
|
||||
if (transactionParticipant != null) {
|
||||
transactionParticipant.buffer(record);
|
||||
}
|
||||
}
|
||||
|
||||
for (TopicPartition partition : context.assignment()) {
|
||||
hudiTransactionParticipants.get(partition).processRecords();
|
||||
if (transactionParticipants.get(partition) == null) {
|
||||
throw new RetriableException("TransactionParticipant should be created for each assigned partition, "
|
||||
+ "but has not been created for the topic/partition: " + partition.topic() + ":" + partition.partition());
|
||||
}
|
||||
try {
|
||||
transactionParticipants.get(partition).processRecords();
|
||||
} catch (IOException exception) {
|
||||
throw new RetriableException("Intermittent write errors for Hudi "
|
||||
+ " for the topic/partition: " + partition.topic() + ":" + partition.partition()
|
||||
+ " , ensuring kafka connect will retry ", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,12 +138,9 @@ public class HoodieSinkTask extends SinkTask {
|
||||
// committed to Hudi.
|
||||
Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
|
||||
for (TopicPartition partition : context.assignment()) {
|
||||
TransactionParticipant worker = hudiTransactionParticipants.get(partition);
|
||||
if (worker != null) {
|
||||
worker.processRecords();
|
||||
if (worker.getLastKafkaCommittedOffset() >= 0) {
|
||||
result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
|
||||
}
|
||||
TransactionParticipant worker = transactionParticipants.get(partition);
|
||||
if (worker != null && worker.getLastKafkaCommittedOffset() >= 0) {
|
||||
result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@@ -158,7 +170,7 @@ public class HoodieSinkTask extends SinkTask {
|
||||
transactionCoordinators.remove(partition);
|
||||
}
|
||||
}
|
||||
TransactionParticipant worker = hudiTransactionParticipants.remove(partition);
|
||||
TransactionParticipant worker = transactionParticipants.remove(partition);
|
||||
if (worker != null) {
|
||||
try {
|
||||
LOG.debug("Closing data writer due to task start failure.");
|
||||
@@ -185,7 +197,7 @@ public class HoodieSinkTask extends SinkTask {
|
||||
transactionCoordinators.put(partition, coordinator);
|
||||
}
|
||||
ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context);
|
||||
hudiTransactionParticipants.put(partition, worker);
|
||||
transactionParticipants.put(partition, worker);
|
||||
worker.start();
|
||||
} catch (HoodieException exception) {
|
||||
LOG.error(String.format("Fatal error initializing task %s for partition %s", taskId, partition.partition()), exception);
|
||||
@@ -195,7 +207,7 @@ public class HoodieSinkTask extends SinkTask {
|
||||
|
||||
private void cleanup() {
|
||||
for (TopicPartition partition : context.assignment()) {
|
||||
TransactionParticipant worker = hudiTransactionParticipants.get(partition);
|
||||
TransactionParticipant worker = transactionParticipants.get(partition);
|
||||
if (worker != null) {
|
||||
try {
|
||||
LOG.debug("Closing data writer due to task start failure.");
|
||||
@@ -205,7 +217,7 @@ public class HoodieSinkTask extends SinkTask {
|
||||
}
|
||||
}
|
||||
}
|
||||
hudiTransactionParticipants.clear();
|
||||
transactionParticipants.clear();
|
||||
transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
|
||||
transactionCoordinators.clear();
|
||||
}
|
||||
|
||||
@@ -131,6 +131,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
|
||||
@Override
|
||||
public void stop() {
|
||||
kafkaControlClient.deregisterTransactionCoordinator(this);
|
||||
scheduler.shutdownNow();
|
||||
hasStarted.set(false);
|
||||
if (executorService != null) {
|
||||
boolean terminated = false;
|
||||
|
||||
@@ -32,7 +32,6 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@@ -111,7 +110,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processRecords() {
|
||||
public void processRecords() throws IOException {
|
||||
while (!controlEvents.isEmpty()) {
|
||||
ControlEvent message = controlEvents.poll();
|
||||
switch (message.getMsgType()) {
|
||||
@@ -153,7 +152,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleEndCommit(ControlEvent message) {
|
||||
private void handleEndCommit(ControlEvent message) throws IOException {
|
||||
if (ongoingTransactionInfo == null) {
|
||||
LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
|
||||
return;
|
||||
@@ -167,28 +166,23 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
|
||||
return;
|
||||
}
|
||||
|
||||
context.pause(partition);
|
||||
ongoingTransactionInfo.commitInitiated();
|
||||
// send Writer Status Message and wait for ACK_COMMIT in async fashion
|
||||
try {
|
||||
context.pause(partition);
|
||||
ongoingTransactionInfo.commitInitiated();
|
||||
//sendWriterStatus
|
||||
List<WriteStatus> writeStatuses = new ArrayList<>();
|
||||
try {
|
||||
writeStatuses = ongoingTransactionInfo.getWriter().close();
|
||||
} catch (IOException exception) {
|
||||
LOG.warn("Error closing the Hudi Writer", exception);
|
||||
}
|
||||
|
||||
ControlEvent writeStatus = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
|
||||
List<WriteStatus> writeStatuses = ongoingTransactionInfo.getWriter().close();
|
||||
ControlEvent writeStatusEvent = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
|
||||
ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
|
||||
.setParticipantInfo(new ControlEvent.ParticipantInfo(
|
||||
writeStatuses,
|
||||
ongoingTransactionInfo.getLastWrittenKafkaOffset(),
|
||||
ControlEvent.OutcomeType.WRITE_SUCCESS))
|
||||
.build();
|
||||
kafkaControlAgent.publishMessage(writeStatus);
|
||||
kafkaControlAgent.publishMessage(writeStatusEvent);
|
||||
} catch (Exception exception) {
|
||||
LOG.warn(String.format("Error ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
|
||||
LOG.error(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
|
||||
throw new IOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The events sent over the Kafka Control Topic between the
|
||||
@@ -108,7 +109,9 @@ public class ControlEvent implements Serializable {
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
|
||||
Arrays.toString(senderPartition), coordinatorInfo.toString(), participantInfo.toString());
|
||||
Arrays.toString(senderPartition),
|
||||
(coordinatorInfo == null) ? "" : coordinatorInfo.toString(),
|
||||
(participantInfo == null) ? "" : participantInfo.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -163,6 +166,13 @@ public class ControlEvent implements Serializable {
|
||||
public Map<Integer, Long> getGlobalKafkaCommitOffsets() {
|
||||
return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s", globalKafkaCommitOffsets.keySet().stream()
|
||||
.map(key -> key + "=" + globalKafkaCommitOffsets.get(key))
|
||||
.collect(Collectors.joining(", ", "{", "}")));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -199,6 +209,11 @@ public class ControlEvent implements Serializable {
|
||||
public OutcomeType getOutcomeType() {
|
||||
return outcomeType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s %s %s", Arrays.toString(writeStatusList), kafkaCommitOffset, outcomeType.name());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -21,6 +21,8 @@ package org.apache.hudi.connect.transaction;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.connect.sink.SinkRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for the Participant that
|
||||
* manages Writes for a
|
||||
@@ -35,7 +37,7 @@ public interface TransactionParticipant {
|
||||
|
||||
void buffer(SinkRecord record);
|
||||
|
||||
void processRecords();
|
||||
void processRecords() throws IOException;
|
||||
|
||||
TopicPartition getPartition();
|
||||
|
||||
|
||||
@@ -81,11 +81,11 @@ public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> close() {
|
||||
public List<WriteStatus> close() throws IOException {
|
||||
return flushHudiRecords();
|
||||
}
|
||||
|
||||
protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record);
|
||||
|
||||
protected abstract List<WriteStatus> flushHudiRecords();
|
||||
protected abstract List<WriteStatus> flushHudiRecords() throws IOException;
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.io.IOUtils;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
@@ -94,7 +93,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> flushHudiRecords() {
|
||||
public List<WriteStatus> flushHudiRecords() throws IOException {
|
||||
try {
|
||||
LOG.info("Number of entries in MemoryBasedMap => "
|
||||
+ bufferedRecords.getInMemoryMapNumEntries()
|
||||
@@ -114,7 +113,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
|
||||
+ writeStatuses);
|
||||
return writeStatuses;
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Write records failed", e);
|
||||
throw new IOException("Write records failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user