Skip to content

Commit

Permalink
KAFKA-18397: Added null check before sending background event from Sh…
Browse files Browse the repository at this point in the history
…areConsumeRequestManager. (#18419)

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
ShivsundarR authored Jan 8, 2025
1 parent 7436159 commit 3c7ed33
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1169,14 +1169,16 @@ class ResultHandler {
* signal the completion when all results are known.
*/
public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, boolean isCommitAsync) {
if (acknowledgements != null) {
if (!isCommitAsync && acknowledgements != null) {
result.put(partition, acknowledgements);
}
// For commitAsync, we do not wait for other results to complete, we prepare a background event
// for every ShareAcknowledgeResponse.
// For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time.
if (isCommitAsync) {
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
if (acknowledgements != null) {
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
}
} else if (remainingResults != null && remainingResults.decrementAndGet() == 0) {
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -548,6 +550,89 @@ public void testAcknowledgeOnCloseWithPendingCommitSync() {
completedAcknowledgements.clear();
}

@Test
public void testResultHandlerOnCommitAsync() {
buildRequestManager();
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);

Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);

ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(null, Optional.empty());

// Passing null acknowledgements should mean we do not send the background event at all.
resultHandler.complete(tip0, null, true);
assertEquals(0, completedAcknowledgements.size());

// Setting isCommitAsync to false should still not send any background event
// as we have initialized remainingResults to null.
resultHandler.complete(tip0, acknowledgements, false);
assertEquals(0, completedAcknowledgements.size());

// Sending non-null acknowledgements means we do send the background event
resultHandler.complete(tip0, acknowledgements, true);
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
}

@Test
public void testResultHandlerOnCommitSync() {
buildRequestManager();
// Enabling the config so that background event is sent when the acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);

Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);

final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();

// Initializing resultCount to 3.
AtomicInteger resultCount = new AtomicInteger(3);

ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));

// We only send the background event after all results have been completed.
resultHandler.complete(tip0, acknowledgements, false);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());

resultHandler.complete(t2ip0, null, false);
assertEquals(0, completedAcknowledgements.size());
assertFalse(future.isDone());

// After third response is received, we send the background event.
resultHandler.complete(tip1, acknowledgements, false);
assertEquals(1, completedAcknowledgements.size());
assertEquals(2, completedAcknowledgements.get(0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
assertEquals(3, completedAcknowledgements.get(0).get(tip1).size());
assertTrue(future.isDone());
}

@Test
public void testResultHandlerCompleteIfEmpty() {
buildRequestManager();

final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = new CompletableFuture<>();

// Initializing resultCount to 1.
AtomicInteger resultCount = new AtomicInteger(1);

ShareConsumeRequestManager.ResultHandler resultHandler = shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));

resultHandler.completeIfEmpty();
assertFalse(future.isDone());

resultCount.decrementAndGet();

resultHandler.completeIfEmpty();
assertTrue(future.isDone());
}

@Test
public void testBatchingAcknowledgeRequestStates() {
buildRequestManager();
Expand Down Expand Up @@ -1730,6 +1815,11 @@ private int sendAcknowledgements() {
return pollResult.unsentRequests.size();
}

public ResultHandler buildResultHandler(final AtomicInteger remainingResults,
final Optional<CompletableFuture<Map<TopicIdPartition, Acknowledgements>>> future) {
return new ResultHandler(remainingResults, future);
}

public Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return super.requestStates(nodeId);
}
Expand Down

0 comments on commit 3c7ed33

Please sign in to comment.