Skip to content

Commit

Permalink
updated monitor logic
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 22, 2023
1 parent 6d7316d commit e2290b3
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public enum ServiceUnitState {

Deleted; // deleted in the system (semi-terminal state)

private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
// (Init -> all states) transitions are required
// when the topic is compacted in the middle of assign, transfer or split.
Init, Set.of(Free, Owned, Assigned, Released, Splitting, Deleted, Init),
Init, Set.of(Free, Owned, Assigned, Released, Splitting, Deleted),
Free, Set.of(Assigned, Init),
Owned, Set.of(Assigned, Splitting, Released, Init),
Assigned, Set.of(Owned, Released, Init),
Released, Set.of(Owned, Free, Init),
Splitting, Set.of(Deleted, Init),
Owned, Set.of(Assigned, Splitting, Released),
Assigned, Set.of(Owned, Released),
Released, Set.of(Owned, Free),
Splitting, Set.of(Deleted),
Deleted, Set.of(Init)
);

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Init;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -48,8 +48,16 @@ public void checkBrokers(boolean check) {

@Override
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
ServiceUnitState prevState = from == null ? Init : from.state();
ServiceUnitState state = to == null ? Init : to.state();
if (to == null) {
return false;
} else if (to.force()) {
return false;
}


ServiceUnitState prevState = state(from);
ServiceUnitState state = state(to);

if (!ServiceUnitState.isValidTransition(prevState, state)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
* This data will be broadcast in ServiceUnitStateChannel.
*/

public record ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker, long timestamp) {
public record ServiceUnitStateData(
ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp) {

public ServiceUnitStateData {
Objects.requireNonNull(state);
Expand All @@ -37,10 +38,18 @@ public record ServiceUnitStateData(ServiceUnitState state, String broker, String
}

public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker) {
this(state, broker, sourceBroker, System.currentTimeMillis());
this(state, broker, sourceBroker, false, System.currentTimeMillis());
}

public ServiceUnitStateData(ServiceUnitState state, String broker) {
this(state, broker, null, System.currentTimeMillis());
this(state, broker, null, false, System.currentTimeMillis());
}

public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force) {
this(state, broker, null, force, System.currentTimeMillis());
}

public static ServiceUnitState state(ServiceUnitStateData data) {
return data == null ? ServiceUnitState.Init : data.state();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,15 @@ SplitDecision.Reason.Balanced, new MutableLong(6)
}

{
FieldUtils.writeDeclaredField(channel1, "totalCleanupCnt", 1, true);
FieldUtils.writeDeclaredField(channel1, "totalBrokerCleanupTombstoneCnt", 2, true);
FieldUtils.writeDeclaredField(channel1, "totalServiceUnitCleanupTombstoneCnt", 3, true);

FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCnt", 1, true);
FieldUtils.writeDeclaredField(channel1, "totalServiceUnitTombstoneCleanupCnt", 2, true);
FieldUtils.writeDeclaredField(channel1, "totalOrphanServiceUnitCleanupCnt", 3, true);
FieldUtils.writeDeclaredField(channel1, "totalCleanupErrorCnt", new AtomicLong(4), true);
FieldUtils.writeDeclaredField(channel1, "totalCleanupScheduledCnt", 5, true);
FieldUtils.writeDeclaredField(channel1, "totalCleanupIgnoredCnt", 6, true);
FieldUtils.writeDeclaredField(channel1, "totalCleanupCancelledCnt", 7, true);
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupScheduledCnt", 5, true);
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupIgnoredCnt", 6, true);
FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCancelledCnt", 7, true);

Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = new LinkedHashMap<>();
Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters> handlerCounters = new LinkedHashMap<>();
Map<ServiceUnitStateChannelImpl.EventType, ServiceUnitStateChannelImpl.Counters> eventCounters =
Expand Down Expand Up @@ -431,16 +433,16 @@ SplitDecision.Reason.Balanced, new MutableLong(6)
dimensions=[{broker=localhost, feature=max_ema, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=4.0}]
dimensions=[{broker=localhost, feature=max, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=0.04}]
dimensions=[{broker=localhost, metric=bundleUnloading}], metrics=[{brk_lb_unload_broker_total=2, brk_lb_unload_bundle_total=3}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Overloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=1}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Underloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Failure}], metrics=[{brk_lb_unload_broker_breakdown_total=10}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Balanced, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=3}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBundles, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=4}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=CoolDown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=5}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=OutDatedData, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=6}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoLoadData, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=7}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBrokers, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=8}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=9}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Overloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=1}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Underloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Failure}], metrics=[{brk_lb_unload_broker_breakdown_total=10}]
dimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=avg}], metrics=[{brk_lb_resource_usage_stats=1.5}]
dimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=std}], metrics=[{brk_lb_resource_usage_stats=0.3}]
dimensions=[{broker=localhost, metric=bundlesSplit}], metrics=[{brk_lb_bundles_split_total=35}]
Expand All @@ -451,9 +453,9 @@ SplitDecision.Reason.Balanced, new MutableLong(6)
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}]
dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
dimensions=[{broker=localhost, metric=sunitStateChn, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
Expand Down Expand Up @@ -481,12 +483,11 @@ SplitDecision.Reason.Balanced, new MutableLong(6)
dimensions=[{broker=localhost, event=Splitting, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=12}]
dimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=13}]
dimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=14}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=4}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_broker_cleanup_ops_total=2, brk_sunit_state_chn_su_cleanup_ops_total=3}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1, brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
""".split("\n"));
var actual = primaryLoadManager.getMetrics().stream().map(m -> m.toString()).collect(Collectors.toSet());
assertEquals(actual, expected);
Expand Down
Loading

0 comments on commit e2290b3

Please sign in to comment.