From e008de9466f452dd92997a11621df3e30b024ece Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 23 May 2023 22:11:30 +0300 Subject: [PATCH] [fix][broker] Change some static fields referencing mutable objects to ordinary instance fields in BrokerService (#20372) --- .../apache/pulsar/broker/admin/impl/BrokersBase.java | 9 ++++----- .../apache/pulsar/broker/service/BrokerService.java | 12 ++++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 3328fa9715b99..b367ce7aad955 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -53,7 +53,6 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; @@ -254,7 +253,7 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) @ApiResponse(code = 403, message = "You don't have admin permission to get configuration")}) public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) { validateSuperUserAccessAsync() - .thenAccept(__ -> asyncResponse.resume(BrokerService.getDynamicConfiguration())) + .thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration())) .exceptionally(ex -> { LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -287,11 +286,11 @@ public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) { */ private synchronized CompletableFuture persistDynamicConfigurationAsync( String configName, String configValue) { - if (!BrokerService.validateDynamicConfiguration(configName, configValue)) { + if (!pulsar().getBrokerService().validateDynamicConfiguration(configName, configValue)) { return FutureUtil .failedFuture(new RestException(Status.PRECONDITION_FAILED, " Invalid dynamic-config value")); } - if (BrokerService.isDynamicConfiguration(configName)) { + if (pulsar().getBrokerService().isDynamicConfiguration(configName)) { return dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> { Map configurationMap = old.orElseGet(Maps::newHashMap); configurationMap.put(configName, configValue); @@ -512,7 +511,7 @@ private CompletableFuture healthCheckRecursiveReadNext(Reader read } private CompletableFuture internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { - if (!BrokerService.isDynamicConfiguration(configName)) { + if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { throw new RestException(Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration"); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 33e5500d623d2..663d013dc7439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -214,7 +214,7 @@ public class BrokerService implements Closeable { private final OrderedExecutor topicOrderedExecutor; // offline topic backlog cache private final ConcurrentOpenHashMap offlineTopicStatCache; - private static final ConcurrentOpenHashMap dynamicConfigurationMap = + private final ConcurrentOpenHashMap dynamicConfigurationMap = prepareDynamicConfigurationMap(); private final ConcurrentOpenHashMap> configRegisteredListeners; @@ -253,10 +253,10 @@ public class BrokerService implements Closeable { public static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; - private static final LongAdder totalUnackedMessages = new LongAdder(); + private final LongAdder totalUnackedMessages = new LongAdder(); private final int maxUnackedMessages; public final int maxUnackedMsgsPerDispatcher; - private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false); + private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false); private final ConcurrentOpenHashSet blockedDispatchers; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -2938,7 +2938,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { return delayedDeliveryTrackerFactory; } - public static List getDynamicConfiguration() { + public List getDynamicConfiguration() { return dynamicConfigurationMap.keys(); } @@ -2951,11 +2951,11 @@ public Map getRuntimeConfiguration() { return configMap; } - public static boolean isDynamicConfiguration(String key) { + public boolean isDynamicConfiguration(String key) { return dynamicConfigurationMap.containsKey(key); } - public static boolean validateDynamicConfiguration(String key, String value) { + public boolean validateDynamicConfiguration(String key, String value) { if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { return dynamicConfigurationMap.get(key).validator.test(value); }