[HUDI-2735] Allow empty commits in Kafka Connect Sink for Hudi (#4544)
This commit is contained in:
@@ -294,7 +294,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
|
|||||||
long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
|
long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
|
||||||
boolean hasErrors = totalErrorRecords > 0;
|
boolean hasErrors = totalErrorRecords > 0;
|
||||||
|
|
||||||
if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) {
|
if (!hasErrors || configs.allowCommitOnErrors()) {
|
||||||
boolean success = transactionServices.endCommit(currentCommitTime,
|
boolean success = transactionServices.endCommit(currentCommitTime,
|
||||||
allWriteStatuses,
|
allWriteStatuses,
|
||||||
transformKafkaOffsets(currentConsumedKafkaOffsets));
|
transformKafkaOffsets(currentConsumedKafkaOffsets));
|
||||||
@@ -319,8 +319,6 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
|
|||||||
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
|
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
LOG.warn("Empty write statuses were received from all Participants");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Submit the next start commit, that will rollback the current commit.
|
// Submit the next start commit, that will rollback the current commit.
|
||||||
|
|||||||
@@ -178,27 +178,39 @@ public class TestConnectTransactionCoordinator {
|
|||||||
List<ControlMessage> controlEvents = new ArrayList<>();
|
List<ControlMessage> controlEvents = new ArrayList<>();
|
||||||
switch (testScenario) {
|
switch (testScenario) {
|
||||||
case ALL_CONNECT_TASKS_SUCCESS:
|
case ALL_CONNECT_TASKS_SUCCESS:
|
||||||
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
|
composeControlEvent(
|
||||||
|
message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
|
||||||
|
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
||||||
|
// This commit round should succeed, and the kafka offsets getting committed
|
||||||
|
kafkaOffsetsCommitted.putAll(kafkaOffsets);
|
||||||
|
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
|
||||||
|
break;
|
||||||
|
case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS:
|
||||||
|
composeControlEvent(
|
||||||
|
message.getCommitTime(), false, true, kafkaOffsets, controlEvents);
|
||||||
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
||||||
// This commit round should succeed, and the kafka offsets getting committed
|
// This commit round should succeed, and the kafka offsets getting committed
|
||||||
kafkaOffsetsCommitted.putAll(kafkaOffsets);
|
kafkaOffsetsCommitted.putAll(kafkaOffsets);
|
||||||
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
|
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
|
||||||
break;
|
break;
|
||||||
case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
|
case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
|
||||||
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
|
composeControlEvent(
|
||||||
|
message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
|
||||||
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
||||||
// Despite error records, this commit round should succeed, and the kafka offsets getting committed
|
// Despite error records, this commit round should succeed, and the kafka offsets getting committed
|
||||||
kafkaOffsetsCommitted.putAll(kafkaOffsets);
|
kafkaOffsetsCommitted.putAll(kafkaOffsets);
|
||||||
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
|
expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
|
||||||
break;
|
break;
|
||||||
case SUBSET_WRITE_STATUS_FAILED:
|
case SUBSET_WRITE_STATUS_FAILED:
|
||||||
composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents);
|
composeControlEvent(
|
||||||
|
message.getCommitTime(), true, false, kafkaOffsets, controlEvents);
|
||||||
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
|
||||||
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
|
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
|
||||||
expectedMsgType = ControlMessage.EventType.START_COMMIT;
|
expectedMsgType = ControlMessage.EventType.START_COMMIT;
|
||||||
break;
|
break;
|
||||||
case SUBSET_CONNECT_TASKS_FAILED:
|
case SUBSET_CONNECT_TASKS_FAILED:
|
||||||
composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents);
|
composeControlEvent(
|
||||||
|
message.getCommitTime(), false, false, kafkaOffsets, controlEvents);
|
||||||
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
|
numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
|
||||||
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
|
// This commit round should fail, and a new commit round should start without kafka offsets getting committed
|
||||||
expectedMsgType = ControlMessage.EventType.START_COMMIT;
|
expectedMsgType = ControlMessage.EventType.START_COMMIT;
|
||||||
@@ -235,10 +247,13 @@ public class TestConnectTransactionCoordinator {
|
|||||||
SUBSET_CONNECT_TASKS_FAILED,
|
SUBSET_CONNECT_TASKS_FAILED,
|
||||||
SUBSET_WRITE_STATUS_FAILED,
|
SUBSET_WRITE_STATUS_FAILED,
|
||||||
SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
|
SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
|
||||||
ALL_CONNECT_TASKS_SUCCESS
|
ALL_CONNECT_TASKS_SUCCESS,
|
||||||
|
ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
|
private static void composeControlEvent(
|
||||||
|
String commitTime, boolean shouldIncludeFailedRecords, boolean useEmptyWriteStatus,
|
||||||
|
Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
|
||||||
// Prepare the WriteStatuses for all partitions
|
// Prepare the WriteStatuses for all partitions
|
||||||
for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
|
for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
|
||||||
try {
|
try {
|
||||||
@@ -248,7 +263,8 @@ public class TestConnectTransactionCoordinator {
|
|||||||
commitTime,
|
commitTime,
|
||||||
new TopicPartition(TOPIC_NAME, i),
|
new TopicPartition(TOPIC_NAME, i),
|
||||||
kafkaOffset,
|
kafkaOffset,
|
||||||
shouldIncludeFailedRecords);
|
shouldIncludeFailedRecords,
|
||||||
|
useEmptyWriteStatus);
|
||||||
controlEvents.add(event);
|
controlEvents.add(event);
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
throw new HoodieException("Fatal error sending control event to Coordinator");
|
throw new HoodieException("Fatal error sending control event to Coordinator");
|
||||||
@@ -259,9 +275,13 @@ public class TestConnectTransactionCoordinator {
|
|||||||
private static ControlMessage composeWriteStatusResponse(String commitTime,
|
private static ControlMessage composeWriteStatusResponse(String commitTime,
|
||||||
TopicPartition partition,
|
TopicPartition partition,
|
||||||
long kafkaOffset,
|
long kafkaOffset,
|
||||||
boolean includeFailedRecords) throws Exception {
|
boolean includeFailedRecords,
|
||||||
// send WS
|
boolean useEmptyWriteStatus) throws Exception {
|
||||||
WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus();
|
List<WriteStatus> writeStatusList = useEmptyWriteStatus ? Collections.emptyList()
|
||||||
|
: Collections.singletonList(
|
||||||
|
includeFailedRecords
|
||||||
|
? getSubsetFailedRecordsWriteStatus()
|
||||||
|
: getAllSuccessfulRecordsWriteStatus());
|
||||||
|
|
||||||
return ControlMessage.newBuilder()
|
return ControlMessage.newBuilder()
|
||||||
.setType(ControlMessage.EventType.WRITE_STATUS)
|
.setType(ControlMessage.EventType.WRITE_STATUS)
|
||||||
@@ -273,7 +293,7 @@ public class TestConnectTransactionCoordinator {
|
|||||||
.setCommitTime(commitTime)
|
.setCommitTime(commitTime)
|
||||||
.setParticipantInfo(
|
.setParticipantInfo(
|
||||||
ControlMessage.ParticipantInfo.newBuilder()
|
ControlMessage.ParticipantInfo.newBuilder()
|
||||||
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus)))
|
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList))
|
||||||
.setKafkaOffset(kafkaOffset)
|
.setKafkaOffset(kafkaOffset)
|
||||||
.build()
|
.build()
|
||||||
).build();
|
).build();
|
||||||
|
|||||||
Reference in New Issue
Block a user