1
0

[HUDI-2890] Kafka Connect: Fix failed writes and avoid table service concurrent operations (#4211)

* Fix kafka connect readme

* Fix handling of errors in write records for kafka connect

* By default, ensure we skip error records and keep the pipeline alive

* Fix indentation

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
rmahindra123
2021-12-03 21:30:32 -08:00
committed by GitHub
parent 0fd6b2d71e
commit 94f45e928c
11 changed files with 270 additions and 95 deletions

View File

@@ -280,26 +280,56 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
private void onReceiveWriteStatus(ControlMessage message) {
ControlMessage.ParticipantInfo participantInfo = message.getParticipantInfo();
int partition = message.getSenderPartition();
partitionsWriteStatusReceived.put(partition, KafkaConnectUtils.getWriteStatuses(participantInfo));
currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaOffset());
int partitionId = message.getSenderPartition();
partitionsWriteStatusReceived.put(partitionId, KafkaConnectUtils.getWriteStatuses(participantInfo));
currentConsumedKafkaOffsets.put(partitionId, participantInfo.getKafkaOffset());
if (partitionsWriteStatusReceived.size() >= numPartitions
&& currentState.equals(State.ENDED_COMMIT)) {
// Commit the kafka offsets to the commit file
try {
List<WriteStatus> allWriteStatuses = new ArrayList<>();
partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll(value));
// Commit the last write in Hudi, along with the latest kafka offset
if (!allWriteStatuses.isEmpty()) {
transactionServices.endCommit(currentCommitTime,
long totalErrorRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
boolean hasErrors = totalErrorRecords > 0;
if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) {
boolean success = transactionServices.endCommit(currentCommitTime,
allWriteStatuses,
transformKafkaOffsets(currentConsumedKafkaOffsets));
if (success) {
LOG.info("Commit " + currentCommitTime + " successful!");
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
message.getTopicName(),
currentCommitTime));
return;
} else {
LOG.error("Commit " + currentCommitTime + " failed!");
}
} else if (hasErrors) {
LOG.error("Coordinator found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("Printing out the top 100 errors");
allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
}
});
} else {
LOG.warn("Empty write statuses were received from all Participants");
}
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
message.getTopicName(),
currentCommitTime));
// Submit the next start commit, that will rollback the current commit.
currentState = State.FAILED_COMMIT;
LOG.warn("Current commit " + currentCommitTime + " failed, so starting a new commit after recovery delay");
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
} catch (Exception exception) {
LOG.error("Fatal error while committing file", exception);
}
@@ -310,7 +340,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
// If we are still stuck in ENDED_STATE
if (currentState.equals(State.ENDED_COMMIT)) {
currentState = State.WRITE_STATUS_TIMEDOUT;
LOG.warn("Did not receive the Write Status from all partitions");
LOG.warn("Current commit " + currentCommitTime + " failed after a write status timeout, so starting a new commit after recovery delay");
// Submit the next start commit
submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
@@ -365,6 +395,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
INIT,
STARTED_COMMIT,
ENDED_COMMIT,
FAILED_COMMIT,
WRITE_STATUS_RCVD,
WRITE_STATUS_TIMEDOUT,
ACKED_COMMIT,

View File

@@ -32,7 +32,7 @@ public interface ConnectTransactionServices {
String startCommit();
void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);
boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);
Map<String, String> fetchLatestExtraCommitMetadata();
}

View File

@@ -88,6 +88,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
.defaultValue(HiveSyncTool.class.getName())
.withDocumentation("Meta sync client tool, using comma to separate multi tools");
public static final ConfigProperty<Boolean> ALLOW_COMMIT_ON_ERRORS = ConfigProperty
.key("hoodie.kafka.allow.commit.on.errors")
.defaultValue(true)
.withDocumentation("Commit even when some records failed to be written");
protected KafkaConnectConfigs() {
super();
}
@@ -136,6 +141,10 @@ public class KafkaConnectConfigs extends HoodieConfig {
return getString(META_SYNC_CLASSES);
}
public Boolean allowCommitOnErrors() {
return getBoolean(ALLOW_COMMIT_ON_ERRORS);
}
public static class Builder {
protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
@@ -160,6 +169,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
return this;
}
public Builder withAllowCommitOnErrors(Boolean allowCommitOnErrors) {
connectConfigs.setValue(ALLOW_COMMIT_ON_ERRORS, String.valueOf(allowCommitOnErrors));
return this;
}
// Kafka connect task are passed with props with type Map<>
public Builder withProperties(Map<?, ?> properties) {
connectConfigs.getProps().putAll(properties);

View File

@@ -77,6 +77,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException {
this.connectConfigs = connectConfigs;
// This is the writeConfig for the Transaction Coordinator
this.writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withProperties(connectConfigs.getProps())
@@ -122,20 +123,23 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
}
@Override
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
LOG.info("Ending Hudi commit " + commitTime);
public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
boolean success = javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
if (success) {
LOG.info("Ending Hudi commit " + commitTime);
// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
}
syncMeta();
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
}
syncMeta();
return success;
}
@Override

View File

@@ -27,6 +27,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
@@ -73,7 +75,7 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps()));
// Create the write client to write some records in
// This is the writeConfig for the writers for the individual Transaction Coordinators
writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withProperties(connectConfigs.getProps())
@@ -84,6 +86,14 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
.withSchema(schemaProvider.getSourceSchema().toString())
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
// participants should not trigger table services, and leave it to the coordinator
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAutoClean(false)
.withAutoArchive(false)
.withInlineCompaction(false).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(false)
.build())
.build();
context = new HoodieJavaEngineContext(hadoopConf);

View File

@@ -50,7 +50,7 @@ import static org.mockito.Mockito.mock;
public class TestConnectTransactionCoordinator {
private static final String TOPIC_NAME = "kafka-connect-test-topic";
private static final int NUM_PARTITIONS = 4;
private static final int TOTAL_KAFKA_PARTITIONS = 4;
private static final int MAX_COMMIT_ROUNDS = 5;
private static final int TEST_TIMEOUT_SECS = 60;
@@ -63,10 +63,6 @@ public class TestConnectTransactionCoordinator {
@BeforeEach
public void setUp() throws Exception {
transactionServices = new MockConnectTransactionServices();
configs = KafkaConnectConfigs.newBuilder()
.withCommitIntervalSecs(1L)
.withCoordinatorWriteTimeoutSecs(1L)
.build();
latch = new CountDownLatch(1);
}
@@ -77,13 +73,22 @@ public class TestConnectTransactionCoordinator {
participant = new MockParticipant(kafkaControlAgent, latch, scenario, MAX_COMMIT_ROUNDS);
participant.start();
KafkaConnectConfigs.Builder configBuilder = KafkaConnectConfigs.newBuilder()
.withCommitIntervalSecs(1L)
.withCoordinatorWriteTimeoutSecs(1L);
if (scenario.equals(MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED)) {
configBuilder.withAllowCommitOnErrors(false);
}
configs = configBuilder.build();
// Test the coordinator using the mock participant
TransactionCoordinator coordinator = new ConnectTransactionCoordinator(
configs,
new TopicPartition(TOPIC_NAME, 0),
kafkaControlAgent,
transactionServices,
(bootstrapServers, topicName) -> NUM_PARTITIONS);
(bootstrapServers, topicName) -> TOTAL_KAFKA_PARTITIONS);
coordinator.start();
latch.await(TEST_TIMEOUT_SECS, TimeUnit.SECONDS);
@@ -119,7 +124,7 @@ public class TestConnectTransactionCoordinator {
this.latch = latch;
this.testScenario = testScenario;
this.maxNumberCommitRounds = maxNumberCommitRounds;
this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1));
this.partition = new TopicPartition(TOPIC_NAME, (TOTAL_KAFKA_PARTITIONS - 1));
this.kafkaOffsetsCommitted = new HashMap<>();
expectedMsgType = ControlMessage.EventType.START_COMMIT;
numberCommitRounds = 0;
@@ -162,39 +167,40 @@ public class TestConnectTransactionCoordinator {
private void testScenarios(ControlMessage message) {
assertEquals(expectedMsgType, message.getType());
switch (message.getType()) {
case START_COMMIT:
expectedMsgType = ControlMessage.EventType.END_COMMIT;
break;
case END_COMMIT:
assertEquals(kafkaOffsetsCommitted, message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
int numSuccessPartitions;
int numPartitionsThatReportWriteStatus;
Map<Integer, Long> kafkaOffsets = new HashMap<>();
List<ControlMessage> controlEvents = new ArrayList<>();
// Prepare the WriteStatuses for all partitions
for (int i = 1; i <= NUM_PARTITIONS; i++) {
try {
long kafkaOffset = (long) (Math.random() * 10000);
kafkaOffsets.put(i, kafkaOffset);
ControlMessage event = successWriteStatus(
message.getCommitTime(),
new TopicPartition(TOPIC_NAME, i),
kafkaOffset);
controlEvents.add(event);
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Coordinator");
}
}
switch (testScenario) {
case ALL_CONNECT_TASKS_SUCCESS:
numSuccessPartitions = NUM_PARTITIONS;
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// This commit round should succeed, and the kafka offsets getting committed
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// Despite error records, this commit round should succeed, and the kafka offsets getting committed
kafkaOffsetsCommitted.putAll(kafkaOffsets);
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
break;
case SUBSET_WRITE_STATUS_FAILED:
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
case SUBSET_CONNECT_TASKS_FAILED:
numSuccessPartitions = NUM_PARTITIONS / 2;
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
expectedMsgType = ControlMessage.EventType.START_COMMIT;
break;
default:
@@ -202,7 +208,7 @@ public class TestConnectTransactionCoordinator {
}
// Send events based on test scenario
for (int i = 0; i < numSuccessPartitions; i++) {
for (int i = 0; i < numPartitionsThatReportWriteStatus; i++) {
kafkaControlAgent.publishMessage(controlEvents.get(i));
}
break;
@@ -227,18 +233,36 @@ public class TestConnectTransactionCoordinator {
public enum TestScenarios {
SUBSET_CONNECT_TASKS_FAILED,
SUBSET_WRITE_STATUS_FAILED,
SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
ALL_CONNECT_TASKS_SUCCESS
}
private static ControlMessage successWriteStatus(String commitTime,
TopicPartition partition,
long kafkaOffset) throws Exception {
// send WS
WriteStatus writeStatus = new WriteStatus();
WriteStatus status = new WriteStatus(false, 1.0);
for (int i = 0; i < 1000; i++) {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
// Prepare the WriteStatuses for all partitions
for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
try {
long kafkaOffset = (long) (Math.random() * 10000);
kafkaOffsets.put(i, kafkaOffset);
ControlMessage event = composeWriteStatusResponse(
commitTime,
new TopicPartition(TOPIC_NAME, i),
kafkaOffset,
shouldIncludeFailedRecords);
controlEvents.add(event);
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Coordinator");
}
}
}
private static ControlMessage composeWriteStatusResponse(String commitTime,
TopicPartition partition,
long kafkaOffset,
boolean includeFailedRecords) throws Exception {
// send WS
WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus();
return ControlMessage.newBuilder()
.setType(ControlMessage.EventType.WRITE_STATUS)
.setTopicName(partition.topic())
@@ -255,4 +279,27 @@ public class TestConnectTransactionCoordinator {
).build();
}
}
private static WriteStatus getAllSuccessfulRecordsWriteStatus() {
// send WS
WriteStatus status = new WriteStatus(false, 0.0);
for (int i = 0; i < 1000; i++) {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
}
return status;
}
private static WriteStatus getSubsetFailedRecordsWriteStatus() {
// send WS
WriteStatus status = new WriteStatus(false, 0.0);
for (int i = 0; i < 1000; i++) {
if (i % 10 == 0) {
status.markFailure(mock(HoodieRecord.class), new Throwable("Error writing record on disk"), Option.empty());
} else {
status.markSuccess(mock(HoodieRecord.class), Option.empty());
}
}
status.setGlobalError(new Throwable("More than one records failed to be written to storage"));
return status;
}
}

View File

@@ -46,8 +46,9 @@ public class MockConnectTransactionServices implements ConnectTransactionService
}
@Override
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
assertEquals(String.valueOf(this.commitTime), commitTime);
return true;
}
@Override

View File

@@ -0,0 +1,30 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# CONSOLE uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -0,0 +1,31 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
# A1 is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL