1
0

[HUDI-2671] Fix kafka offset handling in Kafka Connect protocol (#4021)

Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
rmahindra123
2021-11-24 10:03:58 -08:00
committed by GitHub
parent 9af219b7c1
commit 90f2ea2f12
4 changed files with 147 additions and 118 deletions

View File

@@ -149,7 +149,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime); LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime);
try { try {
ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime)); ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime));
ongoingTransactionInfo.setLastWrittenKafkaOffset(committedKafkaOffset); ongoingTransactionInfo.setExpectedKafkaOffset(committedKafkaOffset);
} catch (Exception exception) { } catch (Exception exception) {
LOG.warn("Error received while starting a new transaction", exception); LOG.warn("Error received while starting a new transaction", exception);
} }
@@ -188,7 +188,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
.setParticipantInfo( .setParticipantInfo(
ControlMessage.ParticipantInfo.newBuilder() ControlMessage.ParticipantInfo.newBuilder()
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses)) .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatuses))
.setKafkaOffset(ongoingTransactionInfo.getLastWrittenKafkaOffset()) .setKafkaOffset(ongoingTransactionInfo.getExpectedKafkaOffset())
.build() .build()
).build(); ).build();
@@ -201,9 +201,9 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
} }
private void handleAckCommit(ControlMessage message) { private void handleAckCommit(ControlMessage message) {
// Update lastKafkCommitedOffset locally. // Update committedKafkaOffset that tracks the last committed kafka offset locally.
if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) { if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getExpectedKafkaOffset()) {
committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset(); committedKafkaOffset = ongoingTransactionInfo.getExpectedKafkaOffset();
} }
syncKafkaOffsetWithLeader(message); syncKafkaOffsetWithLeader(message);
cleanupOngoingTransaction(); cleanupOngoingTransaction();
@@ -215,12 +215,22 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
try { try {
SinkRecord record = buffer.peek(); SinkRecord record = buffer.peek();
if (record != null if (record != null
&& record.kafkaOffset() >= ongoingTransactionInfo.getLastWrittenKafkaOffset()) { && record.kafkaOffset() == ongoingTransactionInfo.getExpectedKafkaOffset()) {
ongoingTransactionInfo.getWriter().writeRecord(record); ongoingTransactionInfo.getWriter().writeRecord(record);
ongoingTransactionInfo.setLastWrittenKafkaOffset(record.kafkaOffset() + 1); ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset() + 1);
} else if (record != null && record.kafkaOffset() < committedKafkaOffset) { } else if (record != null && record.kafkaOffset() > ongoingTransactionInfo.getExpectedKafkaOffset()) {
LOG.warn(String.format("Received a kafka record with offset %s prior to last committed offset %s for partition %s", LOG.warn(String.format("Received a kafka record with offset %s above the next expected kafka offset %s for partition %s, "
record.kafkaOffset(), ongoingTransactionInfo.getLastWrittenKafkaOffset(), + "hence resetting the kafka offset to %s",
record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(),
partition,
ongoingTransactionInfo.getExpectedKafkaOffset()));
context.offset(partition, ongoingTransactionInfo.getExpectedKafkaOffset());
} else if (record != null && record.kafkaOffset() < ongoingTransactionInfo.getExpectedKafkaOffset()) {
LOG.warn(String.format("Received a kafka record with offset %s below the next expected kafka offset %s for partition %s, "
+ "no action will be taken but this record will be ignored since its already written",
record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(),
partition)); partition));
} }
buffer.poll(); buffer.poll();
@@ -250,13 +260,24 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
// Recover kafka committed offsets, treating the commit offset from the coordinator // Recover kafka committed offsets, treating the commit offset from the coordinator
// as the source of truth // as the source of truth
if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) { if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {
// Debug only messages
if (coordinatorCommittedKafkaOffset != committedKafkaOffset) { if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
LOG.warn(String.format("Recovering the kafka offset for partition %s to offset %s instead of local offset %s", LOG.warn(String.format("The coordinator offset for kafka partition %s is %d while the locally committed offset is %d, "
partition.partition(), coordinatorCommittedKafkaOffset, committedKafkaOffset)); + "hence resetting the local committed offset to the coordinator provided one to ensure consistency",
context.offset(partition, coordinatorCommittedKafkaOffset); partition,
coordinatorCommittedKafkaOffset,
committedKafkaOffset));
} }
committedKafkaOffset = coordinatorCommittedKafkaOffset; committedKafkaOffset = coordinatorCommittedKafkaOffset;
return;
} }
} else {
LOG.warn(String.format("The coordinator offset for kafka partition %s is not present while the locally committed offset is %d, "
+ "hence resetting the local committed offset to 0 to avoid data loss",
partition,
committedKafkaOffset));
} }
// If the coordinator does not have a committed offset for this partition, reset to zero offset.
committedKafkaOffset = 0;
} }
} }

View File

@@ -29,13 +29,13 @@ public class TransactionInfo<T> {
private final String commitTime; private final String commitTime;
private final ConnectWriter<T> writer; private final ConnectWriter<T> writer;
private long lastWrittenKafkaOffset; private long expectedKafkaOffset;
private boolean commitInitiated; private boolean commitInitiated;
public TransactionInfo(String commitTime, ConnectWriter<T> writer) { public TransactionInfo(String commitTime, ConnectWriter<T> writer) {
this.commitTime = commitTime; this.commitTime = commitTime;
this.writer = writer; this.writer = writer;
this.lastWrittenKafkaOffset = 0; this.expectedKafkaOffset = 0;
this.commitInitiated = false; this.commitInitiated = false;
} }
@@ -47,16 +47,16 @@ public class TransactionInfo<T> {
return writer; return writer;
} }
public long getLastWrittenKafkaOffset() { public long getExpectedKafkaOffset() {
return lastWrittenKafkaOffset; return expectedKafkaOffset;
} }
public boolean isCommitInitiated() { public boolean isCommitInitiated() {
return commitInitiated; return commitInitiated;
} }
public void setLastWrittenKafkaOffset(long lastWrittenKafkaOffset) { public void setExpectedKafkaOffset(long expectedKafkaOffset) {
this.lastWrittenKafkaOffset = lastWrittenKafkaOffset; this.expectedKafkaOffset = expectedKafkaOffset;
} }
public void commitInitiated() { public void commitInitiated() {

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockKafkaControlAgent; import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider; import org.apache.hudi.helper.TestHudiWriterProvider;
import org.apache.hudi.helper.TestKafkaConnect; import org.apache.hudi.helper.MockKafkaConnect;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -41,23 +41,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestConnectTransactionParticipant { public class TestConnectTransactionParticipant {
private static final String TOPIC_NAME = "kafka-connect-test-topic"; private static final String TOPIC_NAME = "kafka-connect-test-topic";
private static final int NUM_RECORDS_BATCH = 5;
private static final int PARTITION_NUMBER = 4; private static final int PARTITION_NUMBER = 4;
private ConnectTransactionParticipant participant; private ConnectTransactionParticipant participant;
private MockCoordinator coordinator; private MockCoordinator mockCoordinator;
private TopicPartition partition; private TopicPartition partition;
private KafkaConnectConfigs configs; private KafkaConnectConfigs configs;
private KafkaControlAgent kafkaControlAgent; private KafkaControlAgent kafkaControlAgent;
private TestHudiWriterProvider testHudiWriterProvider; private TestHudiWriterProvider testHudiWriterProvider;
private TestKafkaConnect testKafkaConnect; private MockKafkaConnect mockKafkaConnect;
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER); partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
kafkaControlAgent = new MockKafkaControlAgent(); kafkaControlAgent = new MockKafkaControlAgent();
testKafkaConnect = new TestKafkaConnect(partition); mockKafkaConnect = new MockKafkaConnect(partition);
coordinator = new MockCoordinator(kafkaControlAgent); mockCoordinator = new MockCoordinator(kafkaControlAgent);
coordinator.start(); mockCoordinator.start();
configs = KafkaConnectConfigs.newBuilder() configs = KafkaConnectConfigs.newBuilder()
.build(); .build();
initializeParticipant(); initializeParticipant();
@@ -66,26 +67,19 @@ public class TestConnectTransactionParticipant {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = CoordinatorFailureTestScenarios.class) @EnumSource(value = CoordinatorFailureTestScenarios.class)
public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) { public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
int expectedRecordsWritten = 0;
try { try {
assertTrue(mockKafkaConnect.isPaused());
switch (testScenario) { switch (testScenario) {
case REGULAR_SCENARIO: case REGULAR_SCENARIO:
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
break; break;
case COORDINATOR_FAILED_AFTER_START_COMMIT: case COORDINATOR_FAILED_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessStartCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed // Coordinator Failed
initializeCoordinator(); initializeCoordinator();
break; break;
case COORDINATOR_FAILED_AFTER_END_COMMIT: case COORDINATOR_FAILED_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessStartCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); triggerAndProcessEndCommit();
testKafkaConnect.putRecordsToParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
// Coordinator Failed // Coordinator Failed
initializeCoordinator(); initializeCoordinator();
break; break;
@@ -93,18 +87,8 @@ public class TestConnectTransactionParticipant {
throw new HoodieException("Unknown test scenario " + testScenario); throw new HoodieException("Unknown test scenario " + testScenario);
} }
// Regular Case or Coordinator Recovery Case // Despite failures in the previous commit, a fresh 2-phase commit should PASS.
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); testTwoPhaseCommit(0);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
} catch (Exception exception) { } catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception); throw new HoodieException("Unexpected test failure ", exception);
} }
@@ -114,62 +98,38 @@ public class TestConnectTransactionParticipant {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = ParticipantFailureTestScenarios.class) @EnumSource(value = ParticipantFailureTestScenarios.class)
public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) { public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
int expectedRecordsWritten = 0;
try { try {
int currentKafkaOffset = 0;
switch (testScenario) { switch (testScenario) {
case FAILURE_BEFORE_START_COMMIT: case FAILURE_BEFORE_START_COMMIT:
testKafkaConnect.putRecordsToParticipant(); // Participant failing after START_COMMIT will not write any data in this commit cycle.
// Participant fails
initializeParticipant(); initializeParticipant();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isResumed());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
break; break;
case FAILURE_AFTER_START_COMMIT: case FAILURE_AFTER_START_COMMIT:
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessStartCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); // Participant failing after START_COMMIT will not write any data in this commit cycle.
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant(); initializeParticipant();
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessEndCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT); triggerAndProcessAckCommit();
testKafkaConnect.putRecordsToParticipant();
assertTrue(testKafkaConnect.isPaused());
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
testKafkaConnect.putRecordsToParticipant();
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
break; break;
case FAILURE_AFTER_END_COMMIT: case FAILURE_AFTER_END_COMMIT:
testKafkaConnect.putRecordsToParticipant(); // Regular Case or Coordinator Recovery Case
coordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT); triggerAndProcessStartCommit();
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessEndCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant();
// Participant fails
initializeParticipant(); initializeParticipant();
testKafkaConnect.putRecordsToParticipant(); triggerAndProcessAckCommit();
coordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
testKafkaConnect.putRecordsToParticipant(); // Participant failing after and END_COMMIT should not cause issues with the present commit,
assertTrue(testKafkaConnect.isPaused()); // since the data would have been written by previous participant before failing
coordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT); // and hence moved the kafka offset.
testKafkaConnect.putRecordsToParticipant(); currentKafkaOffset = NUM_RECORDS_BATCH;
assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
break; break;
default: default:
throw new HoodieException("Unknown test scenario " + testScenario); throw new HoodieException("Unknown test scenario " + testScenario);
} }
// Despite failures in the previous commit, a fresh 2-phase commit should PASS.
testTwoPhaseCommit(currentKafkaOffset);
} catch (Exception exception) { } catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception); throw new HoodieException("Unexpected test failure ", exception);
} }
@@ -180,15 +140,49 @@ public class TestConnectTransactionParticipant {
participant = new ConnectTransactionParticipant( participant = new ConnectTransactionParticipant(
partition, partition,
kafkaControlAgent, kafkaControlAgent,
testKafkaConnect, mockKafkaConnect,
testHudiWriterProvider); testHudiWriterProvider);
testKafkaConnect.setParticipant(participant); mockKafkaConnect.setParticipant(participant);
participant.start(); participant.start();
} }
private void initializeCoordinator() { private void initializeCoordinator() {
coordinator = new MockCoordinator(kafkaControlAgent); mockCoordinator = new MockCoordinator(kafkaControlAgent);
coordinator.start(); mockCoordinator.start();
}
// Test and validate result of a single 2 Phase commit from START_COMMIT to ACK_COMMIT.
// Validates that NUM_RECORDS_BATCH number of kafka records are written,
// and the kafka offset only increments by NUM_RECORDS_BATCH.
private void testTwoPhaseCommit(long currentKafkaOffset) {
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
triggerAndProcessAckCommit();
// Validate records written, current kafka offset and kafka offsets committed across
// coordinator and participant are in sync despite failure scenarios.
assertEquals(NUM_RECORDS_BATCH, testHudiWriterProvider.getLatestNumberWrites());
assertEquals((currentKafkaOffset + NUM_RECORDS_BATCH), mockKafkaConnect.getCurrentKafkaOffset());
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), mockCoordinator.getCommittedKafkaOffset());
}
private void triggerAndProcessStartCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(NUM_RECORDS_BATCH);
assertTrue(mockKafkaConnect.isResumed());
}
private void triggerAndProcessEndCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
}
private void triggerAndProcessAckCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
} }
private static class MockCoordinator implements TransactionCoordinator { private static class MockCoordinator implements TransactionCoordinator {
@@ -279,5 +273,4 @@ public class TestConnectTransactionParticipant {
FAILURE_AFTER_START_COMMIT, FAILURE_AFTER_START_COMMIT,
FAILURE_AFTER_END_COMMIT, FAILURE_AFTER_END_COMMIT,
} }
} }

View File

@@ -25,7 +25,6 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext; import org.apache.kafka.connect.sink.SinkTaskContext;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@@ -33,20 +32,26 @@ import java.util.Set;
/** /**
* Helper class that emulates the Kafka Connect f/w and additionally * Helper class that emulates the Kafka Connect f/w and additionally
* implements {@link SinkTaskContext} for testing purposes. * implements {@link SinkTaskContext} for testing purposes.
*
* Everytime the consumer (Participant) calls resume, a fixed
* batch of kafka records from the current offset are pushed. If
* the consumer resets the offsets, then a fresh batch of records
* are sent from the new offset.
*/ */
public class TestKafkaConnect implements SinkTaskContext { public class MockKafkaConnect implements SinkTaskContext {
private static final int NUM_RECORDS_BATCH = 5;
private final TopicPartition testPartition; private final TopicPartition testPartition;
private TransactionParticipant participant; private TransactionParticipant participant;
private long currentKafkaOffset; private long currentKafkaOffset;
private boolean isPaused; private boolean isPaused;
private boolean isResetOffset;
public TestKafkaConnect(TopicPartition testPartition) { public MockKafkaConnect(TopicPartition testPartition) {
this.testPartition = testPartition; this.testPartition = testPartition;
isPaused = false; isPaused = false;
currentKafkaOffset = 0L; currentKafkaOffset = 0L;
isResetOffset = false;
} }
public void setParticipant(TransactionParticipant participant) { public void setParticipant(TransactionParticipant participant) {
@@ -61,23 +66,6 @@ public class TestKafkaConnect implements SinkTaskContext {
return !isPaused; return !isPaused;
} }
public int putRecordsToParticipant() throws IOException {
for (int i = 1; i <= NUM_RECORDS_BATCH; i++) {
participant.buffer(getNextKafkaRecord());
}
participant.processRecords();
return NUM_RECORDS_BATCH;
}
public SinkRecord getNextKafkaRecord() {
return new SinkRecord(testPartition.topic(),
testPartition.partition(),
Schema.OPTIONAL_BYTES_SCHEMA,
("key-" + currentKafkaOffset).getBytes(),
Schema.OPTIONAL_BYTES_SCHEMA,
"value".getBytes(), currentKafkaOffset++);
}
public long getCurrentKafkaOffset() { public long getCurrentKafkaOffset() {
return currentKafkaOffset; return currentKafkaOffset;
} }
@@ -100,7 +88,7 @@ public class TestKafkaConnect implements SinkTaskContext {
public void offset(Map<TopicPartition, Long> offsets) { public void offset(Map<TopicPartition, Long> offsets) {
for (TopicPartition tp : offsets.keySet()) { for (TopicPartition tp : offsets.keySet()) {
if (tp.equals(testPartition)) { if (tp.equals(testPartition)) {
currentKafkaOffset = offsets.get(tp); resetOffset(offsets.get(tp));
} }
} }
} }
@@ -108,7 +96,7 @@ public class TestKafkaConnect implements SinkTaskContext {
@Override @Override
public void offset(TopicPartition tp, long offset) { public void offset(TopicPartition tp, long offset) {
if (tp.equals(testPartition)) { if (tp.equals(testPartition)) {
currentKafkaOffset = offset; resetOffset(offset);
} }
} }
@@ -129,6 +117,33 @@ public class TestKafkaConnect implements SinkTaskContext {
@Override @Override
public void requestCommit() { public void requestCommit() {
}
public int publishBatchRecordsToParticipant(int numRecords) {
// Send NUM_RECORDS_BATCH to participant
// If client resets offset, send another batch starting
// from the new reset offset value
do {
isResetOffset = false;
for (int i = 1; i <= numRecords; i++) {
participant.buffer(getNextKafkaRecord());
}
participant.processRecords();
} while (isResetOffset);
return numRecords;
}
private SinkRecord getNextKafkaRecord() {
return new SinkRecord(testPartition.topic(),
testPartition.partition(),
Schema.OPTIONAL_BYTES_SCHEMA,
("key-" + currentKafkaOffset).getBytes(),
Schema.OPTIONAL_BYTES_SCHEMA,
"value".getBytes(), currentKafkaOffset++);
}
private void resetOffset(long newOffset) {
currentKafkaOffset = newOffset;
isResetOffset = true;
} }
} }