Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18418: Use CDL to block the thread termination to avoid flaky tests #18418

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

aoli-al
Copy link
Contributor

@aoli-al aoli-al commented Jan 7, 2025

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

This PR fixes KAFKA-18418. KafkaStreamsTest uses Thread.sleep to prevent threads from terminating. This introduces flaky tests if the sleep duration is not long enough. This patch fixes the issue by replacing the Thread.sleep with a CountDownLatch. The CountDownLatch will be released after assertions are validated.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

I tested the proposed fix against the patch: aoli-al@aced4f1 and verified that all tests have passed. I also tested the new code using Fray (the tool that found the bug), and Fray did not report any bug using the POS strategy after 10 minutes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community streams tests Test fixes (including flaky tests) small Small PRs labels Jan 7, 2025
@@ -957,6 +960,7 @@ public void shouldThrowOnCleanupWhileShuttingDown() throws Exception {
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
terminableThreadBlockingLatch.countDown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this into a finally block? Same below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) statement will call streams.close() at the end of the try block but before the final block. So we need to call terminableThreadBlockingLatch.countDown(); inside the try block.

If we want to ensure terminableThreadBlockingLatch.countDown(); is always called. We may write:

        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
            streams.start();
            try {
                waitForCondition(
                    () -> streams.state() == KafkaStreams.State.RUNNING,
                    "Streams never started.");
                streams.close(Duration.ZERO);
                assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
                assertThrows(IllegalStateException.class, streams::cleanUp);
                assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
            } finally {
                terminableThreadBlockingLatch.countDown();
            }
        }

Please let me know if you prefer this.

@github-actions github-actions bot removed the triage PRs from the community label Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
small Small PRs streams tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants