Skip to content

Commit

Permalink
KAFKA-18411 Remove ZkProducerIdManager (#18413)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Jan 7, 2025
1 parent c40cc57 commit 6aef94e
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 196 deletions.
14 changes: 1 addition & 13 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._
Expand Down Expand Up @@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException.Code
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Try}

sealed trait ElectionTrigger
case object AutoTriggered extends ElectionTrigger
Expand Down Expand Up @@ -2545,17 +2544,6 @@ class KafkaController(val config: KafkaConfig,
callback.apply(Left(Errors.STALE_BROKER_EPOCH))
return
}

val maybeNewProducerIdsBlock = try {
Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this))
} catch {
case ke: KafkaException => Failure(ke)
}

maybeNewProducerIdsBlock match {
case Failure(exception) => callback.apply(Left(Errors.forException(exception)))
case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock))
}
}

private def processControllerChange(): Unit = {
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 6aef94e

Please sign in to comment.