Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename config keys to cmak to address issue#713 #726

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ The minimum configuration is the zookeeper hosts which are to be used for CMAK (
This can be found in the application.conf file in conf directory. The same file will be packaged
in the distribution zip file; you may modify settings after unzipping the file on the desired server.

kafka-manager.zkhosts="my.zookeeper.host.com:2181"
cmak.zkhosts="my.zookeeper.host.com:2181"

You can specify multiple zookeeper hosts by comma delimiting them, like so:

kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
cmak.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"

Alternatively, use the environment variable `ZK_HOSTS` if you don't want to hardcode any values.

Expand All @@ -97,22 +97,22 @@ You can optionally enable/disable the following functionality by modifying the d

Consider setting these parameters for larger clusters with jmx enabled :

- kafka-manager.broker-view-thread-pool-size=< 3 * number_of_brokers>
- kafka-manager.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
- kafka-manager.broker-view-update-seconds=< kafka-manager.broker-view-max-queue-size / (10 * number_of_brokers) >
- cmak.broker-view-thread-pool-size=< 3 * number_of_brokers>
- cmak.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
- cmak.broker-view-update-seconds=< cmak.broker-view-max-queue-size / (10 * number_of_brokers) >

Here is an example for a kafka cluster with 10 brokers, 100 topics, with each topic having 10 partitions giving 1000 total partitions with JMX enabled :

- kafka-manager.broker-view-thread-pool-size=30
- kafka-manager.broker-view-max-queue-size=3000
- kafka-manager.broker-view-update-seconds=30
- cmak.broker-view-thread-pool-size=30
- cmak.broker-view-max-queue-size=3000
- cmak.broker-view-update-seconds=30

The follow control consumer offset cache's thread pool and queue :

- kafka-manager.offset-cache-thread-pool-size=< default is # of processors>
- kafka-manager.offset-cache-max-queue-size=< default is 1000>
- kafka-manager.kafka-admin-client-thread-pool-size=< default is # of processors>
- kafka-manager.kafka-admin-client-max-queue-size=< default is 1000>
- cmak.offset-cache-thread-pool-size=< default is # of processors>
- cmak.offset-cache-max-queue-size=< default is 1000>
- cmak.kafka-admin-client-thread-pool-size=< default is # of processors>
- cmak.kafka-admin-client-max-queue-size=< default is 1000>

You should increase the above for large # of consumers with consumer polling enabled. Though it mainly affects ZK based consumer polling.

Expand Down
63 changes: 31 additions & 32 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,37 @@ object ApiError extends Logging {
}
}

object KafkaManager {

val ConsumerPropertiesFile = "kafka-manager.consumer.properties.file"
val BaseZkPath = "kafka-manager.base-zk-path"
val PinnedDispatchName = "kafka-manager.pinned-dispatcher-name"
val ZkHosts = "kafka-manager.zkhosts"
val BrokerViewUpdateSeconds = "kafka-manager.broker-view-update-seconds"
val KafkaManagerUpdateSeconds = "kafka-manager.kafka-manager-update-seconds"
val DeleteClusterUpdateSeconds = "kafka-manager.delete-cluster-update-seconds"
val DeletionBatchSize = "kafka-manager.deletion-batch-size"
val MaxQueueSize = "kafka-manager.max-queue-size"
val ThreadPoolSize = "kafka-manager.thread-pool-size"
val MutexTimeoutMillis = "kafka-manager.mutex-timeout-millis"
val StartDelayMillis = "kafka-manager.start-delay-millis"
val ApiTimeoutMillis = "kafka-manager.api-timeout-millis"
val ClusterActorsAskTimeoutMillis = "kafka-manager.cluster-actors-ask-timeout-millis"
val PartitionOffsetCacheTimeoutSecs = "kafka-manager.partition-offset-cache-timeout-secs"
val SimpleConsumerSocketTimeoutMillis = "kafka-manager.simple-consumer-socket-timeout-millis"
val BrokerViewThreadPoolSize = "kafka-manager.broker-view-thread-pool-size"
val BrokerViewMaxQueueSize = "kafka-manager.broker-view-max-queue-size"
val OffsetCacheThreadPoolSize = "kafka-manager.offset-cache-thread-pool-size"
val OffsetCacheMaxQueueSize = "kafka-manager.offset-cache-max-queue-size"
val KafkaAdminClientThreadPoolSize = "kafka-manager.kafka-admin-client-thread-pool-size"
val KafkaAdminClientMaxQueueSize = "kafka-manager.kafka-admin-client-max-queue-size"
val KafkaManagedOffsetMetadataCheckMillis = "kafka-manager.kafka-managed-offset-metadata-check-millis"
val KafkaManagedOffsetGroupCacheSize = "kafka-manager.kafka-managed-offset-group-cache-size"
val KafkaManagedOffsetGroupExpireDays = "kafka-manager.kafka-managed-offset-group-expire-days"
import akka.pattern._
import scalaz.{-\/, \/, \/-}
class KafkaManager(akkaConfig: Config) extends Logging {

def getPrefixedKey(key: String): String = if (akkaConfig.hasPathOrNull(s"cmak.$key")) s"cmak.$key" else s"kafka-manager.$key"

val ConsumerPropertiesFile = getPrefixedKey("consumer.properties.file")
val BaseZkPath = getPrefixedKey("base-zk-path")
val PinnedDispatchName = getPrefixedKey("pinned-dispatcher-name")
val ZkHosts = getPrefixedKey("zkhosts")
val BrokerViewUpdateSeconds = getPrefixedKey("broker-view-update-seconds")
val KafkaManagerUpdateSeconds = getPrefixedKey("kafka-manager-update-seconds")
val DeleteClusterUpdateSeconds = getPrefixedKey("delete-cluster-update-seconds")
val DeletionBatchSize = getPrefixedKey("deletion-batch-size")
val MaxQueueSize = getPrefixedKey("max-queue-size")
val ThreadPoolSize = getPrefixedKey("thread-pool-size")
val MutexTimeoutMillis = getPrefixedKey("mutex-timeout-millis")
val StartDelayMillis = getPrefixedKey("start-delay-millis")
val ApiTimeoutMillis = getPrefixedKey("api-timeout-millis")
val ClusterActorsAskTimeoutMillis = getPrefixedKey("cluster-actors-ask-timeout-millis")
val PartitionOffsetCacheTimeoutSecs = getPrefixedKey("partition-offset-cache-timeout-secs")
val SimpleConsumerSocketTimeoutMillis = getPrefixedKey("simple-consumer-socket-timeout-millis")
val BrokerViewThreadPoolSize = getPrefixedKey("broker-view-thread-pool-size")
val BrokerViewMaxQueueSize = getPrefixedKey("broker-view-max-queue-size")
val OffsetCacheThreadPoolSize = getPrefixedKey("offset-cache-thread-pool-size")
val OffsetCacheMaxQueueSize = getPrefixedKey("offset-cache-max-queue-size")
val KafkaAdminClientThreadPoolSize = getPrefixedKey("kafka-admin-client-thread-pool-size")
val KafkaAdminClientMaxQueueSize = getPrefixedKey("kafka-admin-client-max-queue-size")
val KafkaManagedOffsetMetadataCheckMillis = getPrefixedKey("kafka-managed-offset-metadata-check-millis")
val KafkaManagedOffsetGroupCacheSize = getPrefixedKey("kafka-managed-offset-group-cache-size")
val KafkaManagedOffsetGroupExpireDays = getPrefixedKey("kafka-managed-offset-group-expire-days")

val DefaultConfig: Config = {
val defaults: Map[String, _ <: AnyRef] = Map(
Expand Down Expand Up @@ -114,12 +118,7 @@ object KafkaManager {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(defaults.asJava)
}
}

import KafkaManager._
import akka.pattern._
import scalaz.{-\/, \/, \/-}
class KafkaManager(akkaConfig: Config) extends Logging {
private[this] val system = ActorSystem("kafka-manager-system", akkaConfig)

private[this] val configWithDefaults = akkaConfig.withFallback(DefaultConfig)
Expand Down
5 changes: 5 additions & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader

# Settings prefixed with 'kafka-manager.' will be deprecated, use 'cmak.' instead.
# https://github.com/yahoo/CMAK/issues/713
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
cmak.zkhosts="kafka-manager-zookeeper:2181"
cmak.zkhosts=${?ZK_HOSTS}

pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
Expand Down
4 changes: 2 additions & 2 deletions test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M
val configMap: Map[String, AnyRef] = Map(
"pinned-dispatcher.type" -> "PinnedDispatcher",
"pinned-dispatcher.executor" -> "thread-pool-executor",
"kafka-manager.zkhosts" -> kafkaServerZkPath,
KafkaManager.ConsumerPropertiesFile -> "conf/consumer.properties"
"cmak.zkhosts" -> kafkaServerZkPath,
"cmak.consumer.properties.file" -> "conf/consumer.properties"
)
val loader = new KafkaManagerLoaderForTests
application = Option(loader.load(ApplicationLoader.createContext(
Expand Down
10 changes: 5 additions & 5 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
private[this] val akkaConfig: Properties = new Properties()
akkaConfig.setProperty("pinned-dispatcher.type","PinnedDispatcher")
akkaConfig.setProperty("pinned-dispatcher.executor","thread-pool-executor")
akkaConfig.setProperty(KafkaManager.ZkHosts,testServer.getConnectString)
akkaConfig.setProperty(KafkaManager.BrokerViewUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.KafkaManagerUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.DeleteClusterUpdateSeconds,"1")
akkaConfig.setProperty(KafkaManager.ConsumerPropertiesFile,"conf/consumer.properties")
akkaConfig.setProperty("cmak.zkhosts",testServer.getConnectString)
akkaConfig.setProperty("cmak.broker-view-update-seconds","1")
akkaConfig.setProperty("cmak.kafka-manager-update-seconds","1")
akkaConfig.setProperty("cmak.delete-cluster-update-seconds","1")
akkaConfig.setProperty("cmak.consumer.properties.file","conf/consumer.properties")
private[this] val config : Config = ConfigFactory.parseProperties(akkaConfig)

private[this] val kafkaManager : KafkaManager = new KafkaManager(config)
Expand Down