Skip to content

Commit

Permalink
[improve][broker] PIP-192 Added --extensions option in BrokerMonitor (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn authored Mar 1, 2023
1 parent a12f554 commit 579f22c
Showing 1 changed file with 88 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.testclient;

import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
Expand All @@ -31,7 +32,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand All @@ -54,7 +61,7 @@ public class BrokerMonitor {
private static final String BROKER_ROOT = "/loadbalance/brokers";
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 30000;
private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000;
private final ZooKeeper zkClient;
private ZooKeeper zkClient;
private static final Gson gson = new Gson();

// Fields common for message rows.
Expand All @@ -77,7 +84,7 @@ public class BrokerMonitor {
private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC MSG");
private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", "MSG/S", "LONG/S", "KB/S", "MAX %" };

private final Map<String, Object> loadData;
private Map<String, Object> loadData;

private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker();
static {
Expand Down Expand Up @@ -434,8 +441,11 @@ private static class Arguments {
@Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
boolean help;

@Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true)
@Parameter(names = { "--connect-string" }, description = "Zookeeper or broker connect string", required = true)
public String connectString = null;

@Parameter(names = { "--extensions" }, description = "true to monitor Load Balance Extensions.")
boolean extensions = false;
}

/**
Expand Down Expand Up @@ -464,6 +474,71 @@ public void start() {
}
}

private TableView<BrokerLoadData> brokerLoadDataTableView;

private BrokerMonitor(String brokerServiceUrl) {
try {
PulsarClient client = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.serviceUrl(brokerServiceUrl)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
.statsInterval(0, TimeUnit.SECONDS)
.build();
this.brokerLoadDataTableView = client
.newTableView(Schema.JSON(BrokerLoadData.class))
.topic(BROKER_LOAD_DATA_STORE_TOPIC).create();
} catch (Throwable e) {
log.info("Failed to start BrokerMonitor", e);
throw new RuntimeException(e);
}
}

private synchronized void printBrokerLoadData(final String broker, final BrokerLoadData brokerLoadData) {

// Initialize the constant rows.
final Object[][] rows = new Object[6][];
rows[0] = SYSTEM_ROW;
rows[2] = COUNT_ROW;
rows[4] = LATEST_ROW;

// First column is a label, so start at the second column at index 1.
// System row.
rows[1] = new Object[SYSTEM_ROW.length];
initRow(rows[1], brokerLoadData.getCpu().percentUsage(), brokerLoadData.getMemory().percentUsage(),
brokerLoadData.getDirectMemory().percentUsage(), brokerLoadData.getBandwidthIn().percentUsage(),
brokerLoadData.getBandwidthOut().percentUsage(), brokerLoadData.getMaxResourceUsage() * 100);

// Count row.
rows[3] = new Object[COUNT_ROW.length];
initRow(rows[3], null, brokerLoadData.getBundleCount(),
null, null,
null, null);

// Latest message data row.
rows[5] = new Object[LATEST_ROW.length];
initMessageRow(rows[5], brokerLoadData.getMsgRateIn(), brokerLoadData.getMsgRateOut(),
brokerLoadData.getMsgThroughputIn(), brokerLoadData.getMsgThroughputOut());

final String table = localTableMaker.make(rows);
log.info("\nBroker Data for {}:\n{}\n", broker, table);
}

private synchronized void printBrokerLoadDataStore() {
brokerLoadDataTableView.forEach(this::printBrokerLoadData);
}

private void startBrokerLoadDataStoreMonitor() {
try {
while (true) {
Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
printBrokerLoadDataStore();
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Run a monitor from command line arguments.
*
Expand All @@ -481,8 +556,15 @@ public static void main(String[] args) throws Exception {
jc.usage();
PerfClientUtils.exit(1);
}
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final BrokerMonitor monitor = new BrokerMonitor(zkClient);
monitor.start();


if (arguments.extensions) {
final BrokerMonitor monitor = new BrokerMonitor(arguments.connectString);
monitor.startBrokerLoadDataStoreMonitor();
} else {
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final BrokerMonitor monitor = new BrokerMonitor(zkClient);
monitor.start();
}
}
}

0 comments on commit 579f22c

Please sign in to comment.