Skip to content

Commit

Permalink
KAFKA-18440: Admin does not convert the AuthorizationException to fat…
Browse files Browse the repository at this point in the history
…al error in using bootstrap controllers

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Jan 8, 2025
1 parent 0377e80 commit 6ba95ba
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -280,6 +281,9 @@ public void updateFailed(Throwable exception) {
if (exception instanceof AuthenticationException) {
log.warn("Metadata update failed due to authentication error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof AuthorizationException) {
log.warn("Metadata update failed due to authorization error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof MismatchedEndpointTypeException) {
log.warn("Metadata update failed due to mismatched endpoint type error", exception);
this.fatalException = (ApiException) exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;

Expand Down Expand Up @@ -98,6 +99,16 @@ public void testAuthenticationFailure() {
assertTrue(mgr.isReady());
}

@Test
public void testAuthorizationFailure() {
mgr.transitionToUpdatePending(time.milliseconds());
mgr.updateFailed(new AuthorizationException("Authorization failed"));
assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
assertThrows(AuthorizationException.class, mgr::isReady);
mgr.update(mockCluster(), time.milliseconds());
assertTrue(mgr.isReady());
}

@Test
public void testNeedsRebootstrap() {
long rebootstrapTriggerMs = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -35,7 +34,6 @@
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -464,12 +462,12 @@ public void testSaslPlaintext(ClusterInstance clusterInstance) throws Cancellati
}
)
public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
// test with admin
// default ClusterInstance#admin helper with admin credentials
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
admin.describeAcls(AclBindingFilter.ANY).values().get();
}

// test with non-admin
// client with non-admin credentials
Map<String, Object> nonAdminConfig = Map.of(
SaslConfigs.SASL_JAAS_CONFIG,
String.format(
Expand All @@ -480,9 +478,25 @@ public void testSaslPlaintextWithController(ClusterInstance clusterInstance) thr
try (Admin admin = clusterInstance.admin(nonAdminConfig, true)) {
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get()
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
);
assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
}

// client with unknown credentials
Map<String, Object> unknownUserConfig = Map.of(
SaslConfigs.SASL_JAAS_CONFIG,
String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
"unknown", "unknown"
)
);
try (Admin admin = clusterInstance.admin(unknownUserConfig)) {
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
);
assertInstanceOf(TimeoutException.class, exception.getCause());
assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
}
}
}

0 comments on commit 6ba95ba

Please sign in to comment.