Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17631: Convert SaslApiVersionsRequestTest to kraft #18330

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,46 @@
*/
package kafka.server

import kafka.api.{KafkaSasl, SaslSetup}
import kafka.security.JaasTestUtils
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
import org.apache.kafka.common.test.api.{ClusterTemplate, Type, ClusterTestExtensions, ClusterConfig, ClusterInstance}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.test.api.{ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}

import java.net.Socket
import java.util.Collections
import scala.jdk.CollectionConverters._

object SaslApiVersionsRequestTest {
val kafkaClientSaslMechanism = "PLAIN"
val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
val controlPlaneListenerName = "CONTROL_PLANE"
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT

def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
val saslServerProperties = new java.util.HashMap[String, String]()
saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism)
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))

val saslClientProperties = new java.util.HashMap[String, String]()
saslClientProperties.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)

// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")

List(ClusterConfig.defaultBuilder
.setBrokerSecurityProtocol(securityProtocol)
.setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
.build()).asJava
}
}

@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _

@BeforeEach
def setupSasl(): Unit = {
sasl = new SaslSetup() {}
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse,
validateApiVersionsResponse(
apiVersionsResponse,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
Expand All @@ -104,7 +68,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
}
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
Expand All @@ -113,20 +80,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse2,
validateApiVersionsResponse(
apiVersionsResponse2,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}

@AfterEach
def closeSasl(): Unit = {
sasl.closeSasl()
}

private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion)
Expand Down
Loading