Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18275: Restarting broker in testing should use the same port #18381

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -57,8 +56,6 @@

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -247,8 +244,6 @@ public void testBrokerCoordinator() throws Exception {
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));

useFixedBrokerPort();

Comment on lines -250 to -251
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// start the clusters
connect = connectBuilder.build();
connect.start();
Expand Down Expand Up @@ -813,8 +808,6 @@ public void testRequestTimeouts() throws Exception {
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG, MetadataRecoveryStrategy.NONE.name);

useFixedBrokerPort();

Comment on lines -816 to -817
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

connect = connectBuilder
.numWorkers(1)
.build();
Expand Down Expand Up @@ -1431,23 +1424,6 @@ private Map<String, String> defaultSourceConnectorProps(String topic) {
return props;
}

private void useFixedBrokerPort() throws IOException {
// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners
// config to get a random free port because in this test we want to stop the Kafka broker and then bring it
// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka
// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen
// on a different random free port the second time it is started. Note that we can only use the static port
// because we have a single broker setup in this test.
int listenerPort;
try (ServerSocket s = new ServerSocket(0)) {
listenerPort = s.getLocalPort();
}
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
connectBuilder
.numBrokers(1)
.brokerProps(brokerProps);
}

Comment on lines -1434 to -1450
Copy link
Contributor Author

@peterxcli peterxcli Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this to pass the following two tests.

Module Test Message Time
ConnectWorkerIntegrationTest testBrokerCoordinator() org.opentest4j.AssertionFailedError: The Kafka cluster used in this test was not able to start successfully in time. If no recent changes have altered the behavior of Kafka brokers or clients, and this error is not occurring frequently, it is probably the result of the testing machine being temporarily overloaded and can be safely ignored. 62.84s
ConnectWorkerIntegrationTest testRequestTimeouts() org.opentest4j.AssertionFailedError: The Kafka cluster used in this test was not able to start successfully in time. If no recent changes have altered the behavior of Kafka brokers or clients, and this error is not occurring frequently, it is probably the result of the testing machine being temporarily overloaded and can be safely ignored. 60.51s

https://github.com/apache/kafka/actions/runs/12595111592?pr=18381

But I don't really know why, investigating...

Copy link
Contributor Author

@peterxcli peterxcli Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, EmbeddedConnect uses EmbeddedKafkaCluster, and EmbeddedKafkaCluster uses KafkaClusterTestKit to build cluster, and since the brokers has assigned fixed ports in KafkaClusterTestKit, there is no need to modify the LISTENERS_CONFIG for them

public static class EmptyTaskConfigsConnector extends SinkConnector {
@Override
public String version() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,35 +206,42 @@ private void setSecurityProtocolProps(Map<String, Object> props, String security
}
}

public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
File jaasFile = null;

private Optional<File> maybeSetupJaasFile() throws Exception {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
JaasModule.plainLoginModule(
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN,
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
true,
Map.of(
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
)
)
)
)
));
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
JaasUtils.refreshJavaLoginConfigParam(file);
return Optional.of(file);
}
return Optional.empty();
}

public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
Optional<File> jaasFile = maybeSetupJaasFile();
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName);
}
for (TestKitNode node : nodes.brokerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
KafkaConfig config = createNodeConfig(node);
Expand Down Expand Up @@ -308,7 +315,7 @@ public KafkaClusterTestKit build() throws Exception {
baseDirectory,
faultHandlerFactory,
socketFactoryManager,
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
jaasFile);
}

private String listeners(int node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,18 @@ public ServerSocketChannel openServerSocket(
ServerSocketChannel socketChannel = getSocketForListenerAndMarkAsUsed(
nodeId,
listenerName);

if (socketChannel != null) {
if (socketChannel.isOpen()) {
return socketChannel;
}
// bind the server socket with same port
socketAddress = new InetSocketAddress(socketAddress.getHostString(), socketChannel.socket().getLocalPort());
socketChannel = ServerSocketFactory.INSTANCE.openServerSocket(
listenerName,
socketAddress,
listenBacklogSize,
recvBufferSize);
Comment on lines +52 to +63
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR depends #18337

return socketChannel;
}
return ServerSocketFactory.INSTANCE.openServerSocket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kafka.common.test.api;

import kafka.server.ControllerServer;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
Expand All @@ -28,7 +30,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
Expand Down Expand Up @@ -58,6 +62,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -347,6 +352,44 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution
}
}

@ClusterTest(types = {Type.KRAFT}, brokers = 1)
public void testBrokerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
final String topicName = "topic";
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (Admin admin = cluster.admin();
Producer<String, String> producer = cluster.producer(Utils.propsToMap(producerProps))) {
admin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();

cluster.waitForTopic(topicName, 1);

cluster.brokers().values().forEach(broker -> {
broker.shutdown();
broker.awaitShutdown();
broker.startup();
});

RecordMetadata recordMetadata0 = producer.send(new ProducerRecord<>(topicName, 0, "key 0", "value 0")).get();
assertEquals(0, recordMetadata0.offset());
}
}

@ClusterTest(types = {Type.KRAFT})
public void testControllerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
try (Admin admin = cluster.admin()) {

ControllerServer controller = cluster.controllers().values().iterator().next();
controller.shutdown();
controller.awaitShutdown();

controller.startup();

assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
}
}

@ClusterTest(
types = {Type.KRAFT, Type.CO_KRAFT},
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
Expand Down
Loading