Skip to content

Commit

Permalink
[improve][broker] Implemented ExtensibleLoadManagerWrapper.getLoadBal…
Browse files Browse the repository at this point in the history
…ancingMetrics()
  • Loading branch information
heesung-sn committed Feb 7, 2023
1 parent fd3ce8b commit 6a01f9d
Show file tree
Hide file tree
Showing 11 changed files with 810 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -39,6 +40,11 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
Expand All @@ -48,6 +54,7 @@
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

@Slf4j
Expand Down Expand Up @@ -86,6 +93,17 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private boolean started = false;

private final AssignCounter assignCounter = new AssignCounter();
private final UnloadCounter unloadCounter = new UnloadCounter();
private final SplitCounter splitCounter = new SplitCounter();

// record load metrics
private AtomicReference<List<Metrics>> brokerLoadMetrics = new AtomicReference<>();
// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference();
// record split metrics
private AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();

private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = ConcurrentOpenHashMap.<String,
CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
Expand Down Expand Up @@ -158,15 +176,18 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
assignCounter.incrementEmpty();
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
}
assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
});
Expand Down Expand Up @@ -265,4 +286,40 @@ private boolean isInternalTopic(String topic) {
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

void updateBrokerLoadMetrics(BrokerLoadData loadData) {
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
}

private void updateUnloadMetrics(UnloadDecision decision) {
unloadCounter.update(decision);
this.unloadMetrics.set(unloadCounter.toMetrics());
}

private void updateSplitMetrics(List<SplitDecision> decisions) {
for (var decision : decisions) {
splitCounter.update(decision);
}
this.splitMetrics.set(splitCounter.toMetrics());
}

public List<Metrics> getMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();

if (this.brokerLoadMetrics.get() != null) {
metricsCollection.addAll(this.brokerLoadMetrics.get());
}
if (this.unloadMetrics.get() != null) {
metricsCollection.addAll(this.unloadMetrics.get());
}
if (this.splitMetrics.get() != null) {
metricsCollection.addAll(this.splitMetrics.get());
}

metricsCollection.addAll(this.assignCounter.toMetrics());

metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());

return metricsCollection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception {

@Override
public List<Metrics> getLoadBalancingMetrics() {
// TODO: Add metrics.
return null;
return loadManager.getMetrics();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;

Expand Down Expand Up @@ -148,4 +150,10 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
CompletableFuture<Void> publishSplitEventAsync(Split split);

/**
* Generates the metrics to monitor.
* @return a list of the metrics
*/
List<Metrics> getMetrics();

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -68,6 +70,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
Expand Down Expand Up @@ -112,15 +115,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private long totalCleanupCancelledCnt = 0;
private volatile ChannelState channelState;

enum EventType {
public enum EventType {
Assign,
Split,
Unload

}

@Getter
@AllArgsConstructor
static class Counters {
public static class Counters {
private AtomicLong total;
private AtomicLong failure;
}
Expand Down Expand Up @@ -863,4 +867,111 @@ private String printCleanupMetrics() {
);
}


@Override
public List<Metrics> getMetrics() {
var metrics = new ArrayList<Metrics>();
var dimensions = new HashMap<String, String>();
dimensions.put("metric", "sunitStateChn");

for (var etr : ownerLookUpCounters.entrySet()) {
var dim = new HashMap<>(dimensions);
dim.put("state", etr.getKey().toString());
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_owner_lookup_total", etr.getValue());
metrics.add(metric);
}

for (var etr : eventCounters.entrySet()) {
{
var dim = new HashMap<>(dimensions);
dim.put("event", etr.getKey().toString());
dim.put("result", "Total");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_event_publish_ops_total",
etr.getValue().getTotal().get());
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("event", etr.getKey().toString());
dim.put("result", "Failure");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_event_publish_ops_total",
etr.getValue().getFailure().get());
metrics.add(metric);
}
}

for (var etr : handlerCounters.entrySet()) {
{
var dim = new HashMap<>(dimensions);
dim.put("event", etr.getKey().toString());
dim.put("result", "Total");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_subscribe_ops_total",
etr.getValue().getTotal().get());
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("event", etr.getKey().toString());
dim.put("result", "Failure");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_subscribe_ops_total",
etr.getValue().getFailure().get());
metrics.add(metric);
}
}


{
var dim = new HashMap<>(dimensions);
dim.put("result", "Total");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupCnt);
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("result", "Failure");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupErrorCnt.get());
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("result", "Skip");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupIgnoredCnt);
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("result", "Cancel");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupCancelledCnt);
metrics.add(metric);
}

{
var dim = new HashMap<>(dimensions);
dim.put("result", "Schedule");
var metric = Metrics.create(dim);
metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupScheduledCnt);
metrics.add(metric);
}

var metric = Metrics.create(dimensions);
metric.put("brk_sunit_state_chn_broker_cleanup_ops_total", totalBrokerCleanupTombstoneCnt);
metric.put("brk_sunit_state_chn_su_cleanup_ops_total", totalServiceUnitCleanupTombstoneCnt);
metrics.add(metric);

return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
Expand Down Expand Up @@ -187,4 +191,35 @@ public String toString(ServiceConfiguration conf) {
);
}

public List<Metrics> toMetrics(String advertisedBrokerAddress) {
var metrics = new ArrayList<Metrics>();
var dimensions = new HashMap<String, String>();
dimensions.put("metric", "loadBalancing");
dimensions.put("broker", advertisedBrokerAddress);
{
var metric = Metrics.create(dimensions);
metric.put("brk_lb_cpu_usage", getCpu().percentUsage());
metric.put("brk_lb_memory_usage", getMemory().percentUsage());
metric.put("brk_lb_directMemory_usage", getDirectMemory().percentUsage());
metric.put("brk_lb_bandwidth_in_usage", getBandwidthIn().percentUsage());
metric.put("brk_lb_bandwidth_out_usage", getBandwidthOut().percentUsage());
metrics.add(metric);
}
{
var dim = new HashMap<>(dimensions);
dim.put("feature", "max_ema");
var metric = Metrics.create(dim);
metric.put("brk_lb_resource_usage", weightedMaxEMA);
metrics.add(metric);
}
{
var dim = new HashMap<>(dimensions);
dim.put("feature", "max");
var metric = Metrics.create(dim);
metric.put("brk_lb_resource_usage", maxResourceUsage);
metrics.add(metric);
}
return metrics;
}

}
Loading

0 comments on commit 6a01f9d

Please sign in to comment.