diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index bd754f497292c..7744931f6a54c 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -20,7 +20,6 @@ import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import kafka.utils.Logging -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.TopicPartition @@ -100,17 +99,6 @@ object AlterPartitionManager { metadataVersionSupplier = () => metadataCache.metadataVersion() ) } - - /** - * Factory for ZK based implementation, used when IBP < 2.7-IV2 - */ - def apply( - scheduler: Scheduler, - time: Time, - zkClient: KafkaZkClient - ): AlterPartitionManager = { - new ZkAlterPartitionManager(scheduler, time, zkClient) - } } class DefaultAlterPartitionManager( diff --git a/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala deleted file mode 100644 index 942ead0d6d0dc..0000000000000 --- a/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import kafka.utils.{Logging, ReplicationUtils} -import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.CompletableFuture -import org.apache.kafka.common.TopicIdPartition -import org.apache.kafka.common.errors.InvalidUpdateVersionException -import org.apache.kafka.common.utils.Time -import org.apache.kafka.metadata.LeaderAndIsr -import org.apache.kafka.server.util.Scheduler - -import scala.collection.mutable - -/** - * @param checkIntervalMs How often to check for ISR - * @param maxDelayMs Maximum time that an ISR change may be delayed before sending the notification - * @param lingerMs Maximum time to await additional changes before sending the notification - */ -case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, lingerMs: Long) - -object ZkAlterPartitionManager { - // This field is mutable to allow overriding change notification behavior in test cases - @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig( - checkIntervalMs = 2500, - lingerMs = 5000, - maxDelayMs = 60000, - ) -} - -class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) extends AlterPartitionManager with Logging { - - private val isrChangeNotificationConfig = ZkAlterPartitionManager.DefaultIsrPropagationConfig - // Visible for testing - private[server] val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() - private val lastIsrChangeMs = new AtomicLong(time.milliseconds()) - private val lastIsrPropagationMs = new AtomicLong(time.milliseconds()) - - override def start(): Unit = { - scheduler.schedule("isr-change-propagation", () => maybePropagateIsrChanges(), 0L, - isrChangeNotificationConfig.checkIntervalMs) - } - - override def submit( - topicIdPartition: TopicIdPartition, - leaderAndIsr: LeaderAndIsr, - controllerEpoch: Int - ): CompletableFuture[LeaderAndIsr]= { - debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " + - s"${leaderAndIsr.partitionEpoch} for partition $topicIdPartition") - - val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition, - leaderAndIsr, controllerEpoch) - - val future = new CompletableFuture[LeaderAndIsr]() - if (updateSucceeded) { - // Track which partitions need to be propagated to the controller - isrChangeSet synchronized { - isrChangeSet += topicIdPartition.topicPartition - lastIsrChangeMs.set(time.milliseconds()) - } - - // We rely on Partition#isrState being properly set to the pending ISR at this point since we are synchronously - // applying the callback - future.complete(leaderAndIsr.withPartitionEpoch(newVersion)) - } else { - future.completeExceptionally(new InvalidUpdateVersionException( - s"ISR update $leaderAndIsr for partition $topicIdPartition with controller epoch $controllerEpoch " + - "failed with an invalid version error")) - } - future - } - - /** - * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when: - * 1. There is ISR change not propagated yet. - * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation. - * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and - * other brokers when large amount of ISR change occurs. - */ - private[server] def maybePropagateIsrChanges(): Unit = { - val now = time.milliseconds() - isrChangeSet synchronized { - if (isrChangeSet.nonEmpty && - (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now || - lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) { - zkClient.propagateIsrChanges(isrChangeSet) - isrChangeSet.clear() - lastIsrPropagationMs.set(now) - } - } - } -} diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala deleted file mode 100644 index 8cb03f4553312..0000000000000 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.zk._ -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.metadata.LeaderAndIsr - -import scala.jdk.CollectionConverters._ - -object ReplicationUtils extends Logging { - - def updateLeaderAndIsr(zkClient: KafkaZkClient, partition: TopicPartition, newLeaderAndIsr: LeaderAndIsr, - controllerEpoch: Int): (Boolean, Int) = { - debug(s"Updated ISR for $partition to ${newLeaderAndIsr.isr.asScala.mkString(",")}") - val path = TopicPartitionStateZNode.path(partition) - val newLeaderData = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch)) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, - newLeaderAndIsr.partitionEpoch, Some(checkLeaderAndIsrZkData)) - updatePersistentPath - } - - private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: Array[Byte]): (Boolean, Int) = { - try { - val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path) - val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat) - val succeeded = writtenLeaderOpt.exists { writtenData => - val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat) - (expectedLeaderOpt, writtenLeaderOpt) match { - case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true - case _ => false - } - } - if (succeeded) (true, writtenStat.getVersion) - else (false, -1) - } catch { - case _: Exception => (false, -1) - } - } - -} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index f21522067ca29..b8ddaae026af1 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -21,7 +21,6 @@ import com.yammer.metrics.core.Metric import kafka.log._ import kafka.server._ import kafka.utils._ -import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException} import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -36,7 +35,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong, anyString} +import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -44,7 +43,7 @@ import java.lang.{Long => JLong} import java.nio.ByteBuffer import java.util.Optional import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore} -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.metadata.KRaftMetadataCache import kafka.server.share.DelayedShareFetch import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.compress.Compression @@ -55,7 +54,6 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager, RequestLocal} -import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey @@ -1503,7 +1501,7 @@ class PartitionTest extends AbstractPartitionTest { val isrItem = alterPartitionManager.isrUpdates.head assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava) isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => - // In ZK mode, the broker epochs in the leaderAndIsr should be -1. + // the broker epochs in the leaderAndIsr should be -1. assertEquals(-1, brokerState.brokerEpoch()) } assertEquals(Set(brokerId), partition.partitionState.isr) @@ -1682,8 +1680,6 @@ class PartitionTest extends AbstractPartitionTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = { - val kraft = quorum == "kraft" - val log = logManager.getOrCreateLog(topicPartition, topicId = None) seedLogData(log, numRecords = 10, leaderEpoch = 4) @@ -1693,20 +1689,13 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = Set(brokerId) - val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache]) - if (kraft) { - addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas) - } + val metadataCache = mock(classOf[KRaftMetadataCache]) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) // Mark the remote broker as eligible or ineligible in the metadata cache of the leader. // When using kraft, we can make the broker ineligible by fencing it. - // In ZK mode, we must mark the broker as alive for it to be eligible. def markRemoteReplicaEligible(eligible: Boolean): Unit = { - if (kraft) { - when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible) - } else { - when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible) - } + when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(!eligible) } val partition = new Partition( @@ -1845,7 +1834,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(isr, partition.partitionState.maximalIsr) // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. - addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], List(brokerId, remoteBrokerId2)) + addBrokerEpochToMockMetadataCache(metadataCache, List(brokerId, remoteBrokerId2)) // Create a race case where the replica epoch get bumped right after the previous fetch succeeded. val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1 when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Option(wrongReplicaEpoch), Option(defaultBrokerEpoch(remoteBrokerId1))) @@ -1905,8 +1894,8 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId1) val isr = Set(brokerId, remoteBrokerId1) - val metadataCache: MetadataCache = mock(classOf[KRaftMetadataCache]) - addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas) + val metadataCache = mock(classOf[KRaftMetadataCache]) + addBrokerEpochToMockMetadataCache(metadataCache, replicas) val partition = new Partition( topicPartition, @@ -2698,71 +2687,6 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionManager.isrUpdates.size, 1) } - @Test - def testZkIsrManagerAsyncCallback(): Unit = { - // We need a real scheduler here so that the ISR write lock works properly - val scheduler = new KafkaScheduler(1, true, "zk-isr-test") - scheduler.startup() - val kafkaZkClient = mock(classOf[KafkaZkClient]) - - doAnswer(_ => (true, 2)) - .when(kafkaZkClient) - .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any()) - - val zkIsrManager = AlterPartitionManager(scheduler, time, kafkaZkClient) - zkIsrManager.start() - - val partition = new Partition(topicPartition, - replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, - interBrokerProtocolVersion = IBP_2_6_IV0, // shouldn't matter, but set this to a ZK isr version - localBrokerId = brokerId, - () => defaultBrokerEpoch(brokerId), - time, - alterPartitionListener, - delayedOperations, - metadataCache, - logManager, - zkIsrManager) - - val log = logManager.getOrCreateLog(topicPartition, topicId = None) - seedLogData(log, numRecords = 10, leaderEpoch = 4) - - val controllerEpoch = 0 - val leaderEpoch = 5 - val follower1 = brokerId + 1 - val follower2 = brokerId + 2 - val follower3 = brokerId + 3 - val replicas = Seq(brokerId, follower1, follower2, follower3) - val isr = Seq(brokerId, follower1, follower2) - - doNothing().when(delayedOperations).checkAndCompleteAll() - - assertTrue(makeLeader( - partition = partition, - topicId = None, - controllerEpoch = controllerEpoch, - leaderEpoch = leaderEpoch, - isr = isr, - replicas = replicas, - partitionEpoch = 1, - isNew = true - )) - assertEquals(0L, partition.localLogOrException.highWatermark) - - // Expand ISR - fetchFollower(partition, replicaId = follower3, fetchOffset = 10L) - - // Try avoiding a race - TestUtils.waitUntilTrue(() => !partition.partitionState.isInflight, "Expected ISR state to be committed", 100) - - partition.partitionState match { - case CommittedPartitionState(isr, _) => assertEquals(Set(brokerId, follower1, follower2, follower3), isr) - case _ => fail("Expected a committed ISR following Zk expansion") - } - - scheduler.shutdown() - } - @Test def testUseCheckpointToInitializeHighWatermark(): Unit = { val log = logManager.getOrCreateLog(topicPartition, topicId = None) @@ -2936,7 +2860,7 @@ class PartitionTest extends AbstractPartitionTest { val partition = new Partition( topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0), Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), - mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) + mock(classOf[KRaftMetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) val replicas = Seq(0, 1, 2, 3) val followers = Seq(1, 2, 3) @@ -3200,7 +3124,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) val leaderLog = partition.localLogOrException - assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry)) + assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.toJava.flatMap(_.latestEntry)) // Write to the log to increment the log end offset. leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, Compression.NONE, 0, @@ -3224,7 +3148,7 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Set(leaderId), partition.partitionState.isr) assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) - assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry)) + assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.toJava.flatMap(_.latestEntry)) } @Test @@ -3773,8 +3697,8 @@ class PartitionTest extends AbstractPartitionTest { fetchOffset, FetchRequest.INVALID_LOG_START_OFFSET, maxBytes, - leaderEpoch.map(Int.box).asJava, - lastFetchedEpoch.map(Int.box).asJava + leaderEpoch.map(Int.box).toJava, + lastFetchedEpoch.map(Int.box).toJava ) partition.fetchRecords( @@ -3810,8 +3734,8 @@ class PartitionTest extends AbstractPartitionTest { fetchOffset, logStartOffset, maxBytes, - leaderEpoch.map(Int.box).asJava, - lastFetchedEpoch.map(Int.box).asJava + leaderEpoch.map(Int.box).toJava, + lastFetchedEpoch.map(Int.box).toJava ) partition.fetchRecords( diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala index 38825ef417409..6dbfaefc42c52 100644 --- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala @@ -19,11 +19,10 @@ package kafka.server import java.util.Collections import java.util.stream.{Stream => JStream} -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.Uuid -import org.apache.kafka.common.errors.{AuthenticationException, InvalidUpdateVersionException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException} +import org.apache.kafka.common.errors.{AuthenticationException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException} import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState import org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitionResponseData} import org.apache.kafka.common.metrics.Metrics @@ -43,7 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource import org.mockito.ArgumentMatcher -import org.mockito.ArgumentMatchers.{any, anyString} +import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mock, reset, times, verify} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} @@ -629,33 +628,6 @@ class AlterPartitionManagerTest { .setErrorCode(error.code))) } - @Test - def testZkBasic(): Unit = { - val scheduler = new MockScheduler(time) - scheduler.startup() - - val kafkaZkClient = Mockito.mock(classOf[KafkaZkClient]) - Mockito.doAnswer(_ => (true, 2)) - .when(kafkaZkClient) - .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any()) - Mockito.doAnswer(_ => (false, 2)) - .when(kafkaZkClient) - .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any()) - - val zkIsrManager = new ZkAlterPartitionManager(scheduler, time, kafkaZkClient) - zkIsrManager.start() - - // Correct ZK version - val future1 = zkIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 1), 0) - assertTrue(future1.isDone) - assertEquals(new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 2), future1.get) - - // Wrong ZK version - val future2 = zkIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 3), 0) - assertTrue(future2.isCompletedExceptionally) - assertFutureThrows(future2, classOf[InvalidUpdateVersionException]) - } - private def partitionResponse( tp: TopicIdPartition = tp0, error: Errors = Errors.NONE,