From bc95571caade9ceef00abf24e3e6e82fc9643c05 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 10 Jan 2022 12:31:25 -0800 Subject: [PATCH] [HUDI-2735] Allow empty commits in Kafka Connect Sink for Hudi (#4544) --- .../ConnectTransactionCoordinator.java | 4 +- .../TestConnectTransactionCoordinator.java | 42 ++++++++++++++----- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java index 14fd880b1..1157b2165 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java @@ -294,7 +294,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum(); boolean hasErrors = totalErrorRecords > 0; - if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) { + if (!hasErrors || configs.allowCommitOnErrors()) { boolean success = transactionServices.endCommit(currentCommitTime, allWriteStatuses, transformKafkaOffsets(currentConsumedKafkaOffsets)); @@ -319,8 +319,6 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); } }); - } else { - LOG.warn("Empty write statuses were received from all Participants"); } // Submit the next start commit, that will rollback the current commit. diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java index f003fe9fa..d939351a5 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java @@ -178,27 +178,39 @@ public class TestConnectTransactionCoordinator { List controlEvents = new ArrayList<>(); switch (testScenario) { case ALL_CONNECT_TASKS_SUCCESS: - composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), false, false, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; + // This commit round should succeed, and the kafka offsets getting committed + kafkaOffsetsCommitted.putAll(kafkaOffsets); + expectedMsgType = ControlMessage.EventType.ACK_COMMIT; + break; + case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS: + composeControlEvent( + message.getCommitTime(), false, true, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // This commit round should succeed, and the kafka offsets getting committed kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED: - composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), true, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // Despite error records, this commit round should succeed, and the kafka offsets getting committed kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; case SUBSET_WRITE_STATUS_FAILED: - composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), true, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // This commit round should fail, and a new commit round should start without kafka offsets getting committed expectedMsgType = ControlMessage.EventType.START_COMMIT; break; case SUBSET_CONNECT_TASKS_FAILED: - composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), false, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2; // This commit round should fail, and a new commit round should start without kafka offsets getting committed expectedMsgType = ControlMessage.EventType.START_COMMIT; @@ -235,10 +247,13 @@ public class TestConnectTransactionCoordinator { SUBSET_CONNECT_TASKS_FAILED, SUBSET_WRITE_STATUS_FAILED, SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED, - ALL_CONNECT_TASKS_SUCCESS + ALL_CONNECT_TASKS_SUCCESS, + ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS } - private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map kafkaOffsets, List controlEvents) { + private static void composeControlEvent( + String commitTime, boolean shouldIncludeFailedRecords, boolean useEmptyWriteStatus, + Map kafkaOffsets, List controlEvents) { // Prepare the WriteStatuses for all partitions for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) { try { @@ -248,7 +263,8 @@ public class TestConnectTransactionCoordinator { commitTime, new TopicPartition(TOPIC_NAME, i), kafkaOffset, - shouldIncludeFailedRecords); + shouldIncludeFailedRecords, + useEmptyWriteStatus); controlEvents.add(event); } catch (Exception exception) { throw new HoodieException("Fatal error sending control event to Coordinator"); @@ -259,9 +275,13 @@ public class TestConnectTransactionCoordinator { private static ControlMessage composeWriteStatusResponse(String commitTime, TopicPartition partition, long kafkaOffset, - boolean includeFailedRecords) throws Exception { - // send WS - WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus(); + boolean includeFailedRecords, + boolean useEmptyWriteStatus) throws Exception { + List writeStatusList = useEmptyWriteStatus ? Collections.emptyList() + : Collections.singletonList( + includeFailedRecords + ? getSubsetFailedRecordsWriteStatus() + : getAllSuccessfulRecordsWriteStatus()); return ControlMessage.newBuilder() .setType(ControlMessage.EventType.WRITE_STATUS) @@ -273,7 +293,7 @@ public class TestConnectTransactionCoordinator { .setCommitTime(commitTime) .setParticipantInfo( ControlMessage.ParticipantInfo.newBuilder() - .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus))) + .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList)) .setKafkaOffset(kafkaOffset) .build() ).build();