-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18428: Measure share consumers performance #18415
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few comments to resolve.
@@ -0,0 +1,20 @@ | |||
#!/bin/bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can you also create bin/windows/kafka-share-consumer-perf-test.bat
.
ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args); | ||
AtomicLong totalMessagesRead = new AtomicLong(0); | ||
AtomicLong totalBytesRead = new AtomicLong(0); | ||
AtomicLong joinTimeMs = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
joinTimeMs
and joinTimeMsInSingleRound
don't make sense for a share group consumer. In the regular consumer perf test, they are used to see how long it took to join the group. A share consumer cannot work this out because there is no onPartitionsAssigned
callback. Please remove these variables throughout the code.
printStats(totalBytesRead.get(), totalMessagesRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, | ||
options.dateFormat(), -1); | ||
|
||
if (!shareConsumersMetrics.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be neater as shareConsumerMetrics.forEach
.
totalBytesRead.set(bytesRead.get()); | ||
} | ||
|
||
private static void consumeMessagesForSingleShareConsumer(long currentTimeMs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be more neatly initialised by having lastConsumedTimeMs
and currentTimeMs
as local variables, and then initialising with
long currentTimeMs = System.currentTimeMills();
long lastConsumedTimeMs = currentTimeMs;
and then going into the while loop.
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); | ||
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString()); | ||
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be FETCH_MAX_BYTES_CONFIG
for a share consumer I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
About
Added code to measure performance of share consumer/consumers in a share group. Added the following files -
ShareConsumerPerformance.java
- Code which measures the performance of share consumer/consumers.ShareConsumerPerformanceTest.java
- Contains unit tests for individual functionalities inShareConsumerPerformance.java
kafka-share-consumer-perf-test.sh
- CLI utility to runShareConsumerPerformance.java