-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18311: Configuring repartition topics (3/N) #18395
base: trunk
Are you sure you want to change the base?
Conversation
@cadonna PTAL! |
PTAL @aliehsaeedii |
4fe6d67
to
d7c6dc0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lucasbru !
Here my feedback!
} | ||
|
||
/** | ||
* Returns the set the number of partitions for each repartition topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Returns the set the number of partitions for each repartition topic. | |
* Returns the set of the number of partitions for each repartition topic. |
|
||
private Integer computePartitionCount(final Map<String, Integer> repartitionSourceTopicPartitionCounts, | ||
final String repartitionSourceTopic, | ||
Map<String, Set<String>> repartitionSinkTopics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<String, Set<String>> repartitionSinkTopics) { | |
final Map<String, Set<String>> repartitionSinkTopics) { |
final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, | ||
repartitionTopics::setup); | ||
|
||
assertNotNull(exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this implied by assertThrows()
?
.setRepartitionSourceTopics(List.of( | ||
REPARTITION_TOPIC_INFO1, | ||
REPARTITION_TOPIC_INFO2 | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should those two repartition topics not also be part of the source topics on line 70?
It does not change the test but I was confused.
REPARTITION_TOPIC_INFO2, | ||
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is a bit confusing.
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
and REPARTITION_TOPIC_INFO2
in RepartitionSourceTopics
have no reason, haven't they?
Additionally, REPARTITION_TOPIC_INFO1
makes the two subtopology be a cycle.
The test is correct also this way, but it makes the test hard to read and understand.
I propose to something like this:
@Test
public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() {
final Subtopology subtopology = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY0")
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME3))
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
.setRepartitionSourceTopics(List.of(
REPARTITION_TOPIC_INFO3
))
.setStateChangelogTopics(Collections.emptyList());
List<Subtopology> subtopologyToSubtopology = List.of(subtopology, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(2) : OptionalInt.empty();
final RepartitionTopics repartitionTopics = new RepartitionTopics(
LOG_CONTEXT,
subtopologyToSubtopology,
topicPartitionCountProvider
);
Map<String, Integer> setup = repartitionTopics.setup();
assertEquals(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()),
mkEntry(REPARTITION_TOPIC_NAME3, REPARTITION_TOPIC_INFO3.partitions()),
mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_INFO3.partitions())
),
setup
);
}
where REPARTITION_TOPIC_INFO3
is:
private static final TopicInfo REPARTITION_TOPIC_INFO3 = new TopicInfo()
.setName(REPARTITION_TOPIC_NAME3)
.setPartitions(3)
.setTopicConfigs(List.of(TOPIC_CONFIG2));
REPARTITION_TOPIC_INFO1, | ||
REPARTITION_TOPIC_INFO2, | ||
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REPARTITION_TOPIC_INFO1, | |
REPARTITION_TOPIC_INFO2, | |
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT | |
REPARTITION_TOPIC_INFO2 |
subtopology, | ||
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT | ||
); | ||
Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); | |
int expectedPartitionCount = 3 | |
Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(expectedPartitionCount) : OptionalInt.empty(); |
and use expectedPartitionCount
in the verification on line 187.
} | ||
|
||
@Test | ||
public void shouldSetRepartitionTopicPartitionCountFromUpstreamSourceTopicMultipleSubtopologies() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic()
?
A simplified port of "RepartitionTopics" from the client-side to the group coordinator.
Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.
Committer Checklist (excluded from commit message)