Skip to content

Commit

Permalink
KAFKA-18433: Add BatchSize to ShareFetch request (1/N) (#18439)
Browse files Browse the repository at this point in the history
Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
AndrewJSchofield authored Jan 8, 2025
1 parent b51b31e commit 3f9d2c2
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi

return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
}

public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize,
int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
Expand All @@ -67,6 +67,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
data.setBatchSize(batchSize);

// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "BatchSize", "type": "int32", "versions": "0+",
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
Expand All @@ -45,7 +47,7 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
"about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2366,8 +2366,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
maxWaitMs: Int = MAX_WAIT_MS,
minBytes: Int = 0,
maxBytes: Int = Int.MaxValue): ShareFetchRequest = {
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, send.asJava, forget.asJava, acknowledgementsMap.asJava)
maxBytes: Int = Int.MaxValue,
batchSize: Int = 500): ShareFetchRequest = {
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava, acknowledgementsMap.asJava)
.build()
}

Expand Down

0 comments on commit 3f9d2c2

Please sign in to comment.