Skip to content

Commit

Permalink
KAFKA-17631: Convert SaslApiVersionsRequestTest to kraft
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Jan 8, 2025
1 parent 0c435e3 commit 2b5bee7
Showing 1 changed file with 23 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,46 @@
*/
package kafka.server

import kafka.api.{KafkaSasl, SaslSetup}
import kafka.security.JaasTestUtils
import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms}
import org.apache.kafka.common.test.api.{ClusterTemplate, Type, ClusterTestExtensions, ClusterConfig, ClusterInstance}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.test.api.{ClusterInstance, ClusterTest, ClusterTestExtensions, Type}
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}

import java.net.Socket
import java.util.Collections
import scala.jdk.CollectionConverters._

object SaslApiVersionsRequestTest {
val kafkaClientSaslMechanism = "PLAIN"
val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
val controlPlaneListenerName = "CONTROL_PLANE"
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT

def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
val saslServerProperties = new java.util.HashMap[String, String]()
saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism)
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))

val saslClientProperties = new java.util.HashMap[String, String]()
saslClientProperties.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)

// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")

List(ClusterConfig.defaultBuilder
.setBrokerSecurityProtocol(securityProtocol)
.setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
.build()).asJava
}
}

@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _

@BeforeEach
def setupSasl(): Unit = {
sasl = new SaslSetup() {}
sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse,
validateApiVersionsResponse(
apiVersionsResponse,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
Expand All @@ -104,7 +68,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
}
}

@ClusterTemplate("saslApiVersionsRequestClusterConfig")
@ClusterTest(types = Array(Type.KRAFT),
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
try {
Expand All @@ -113,20 +80,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse2,
validateApiVersionsResponse(
apiVersionsResponse2,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
cluster.config().serverProperties().get("unstable.api.versions.enable")),
apiVersion = 0.toShort
)
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}

@AfterEach
def closeSasl(): Unit = {
sasl.closeSasl()
}

private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion)
Expand Down

0 comments on commit 2b5bee7

Please sign in to comment.