From 9ffa098670a9e908c10e089c0de0fb1319315406 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 25 Dec 2017 20:57:49 +0800 Subject: [PATCH 1/3] scheduler: check both add leader and remove leader when balance leader. --- server/balancer_test.go | 9 ++++++++ server/schedulers/utils.go | 45 +++++++++++++++++++++++++------------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/server/balancer_test.go b/server/balancer_test.go index 634f7061674..c5c5e6752f4 100644 --- a/server/balancer_test.go +++ b/server/balancer_test.go @@ -386,6 +386,15 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { s.tc.updateLeaderCount(3, 9) // Average leader is 7. Select store 1 as target. checkTransferLeader(c, s.schedule(), 3, 1) + + // Stores: 1 2 3 4 + // Leaders: 1 2 15 16 + // Region1: - F F L + // Region2: - F L F + s.tc.addLeaderRegion(2, 3, 2, 4) + s.tc.addLeaderStore(2, 2) + // Unable to find a region in store 1. Transfer a leader out of store 4 instead. + checkTransferLeader(c, s.schedule(), 4, 2) } var _ = Suite(&testBalanceRegionSchedulerSuite{}) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 8a8e3773182..12e9c6fae01 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -25,7 +25,7 @@ import ( ) // scheduleTransferLeader schedules a region to transfer leader to the peer. -func scheduleTransferLeader(cluster schedule.Cluster, schedulerName string, s schedule.Selector, filters ...schedule.Filter) (*core.RegionInfo, *metapb.Peer) { +func scheduleTransferLeader(cluster schedule.Cluster, schedulerName string, s schedule.Selector, filters ...schedule.Filter) (region *core.RegionInfo, peer *metapb.Peer) { stores := cluster.GetStores() if len(stores) == 0 { schedulerCounter.WithLabelValues(schedulerName, "no_store").Inc() @@ -53,29 +53,44 @@ func scheduleTransferLeader(cluster schedule.Cluster, schedulerName string, s sc } if mostLeaderDistance > leastLeaderDistance { - // Transfer a leader out of mostLeaderStore. - region := cluster.RandLeaderRegion(mostLeaderStore.GetId()) + region, peer = scheduleRemoveLeader(cluster, schedulerName, mostLeaderStore.GetId(), s) if region == nil { - schedulerCounter.WithLabelValues(schedulerName, "no_leader_region").Inc() - return nil, nil + region, peer = scheduleAddLeader(cluster, schedulerName, leastLeaderStore.GetId()) } - targetStores := cluster.GetFollowerStores(region) - target := s.SelectTarget(targetStores) - if target == nil { - schedulerCounter.WithLabelValues(schedulerName, "no_target_store").Inc() - return nil, nil + } else { + region, peer = scheduleAddLeader(cluster, schedulerName, leastLeaderStore.GetId()) + if region == nil { + region, peer = scheduleRemoveLeader(cluster, schedulerName, mostLeaderStore.GetId(), s) } - - return region, region.GetStorePeer(target.GetId()) } + return region, peer +} - // Transfer a leader into leastLeaderStore. - region := cluster.RandFollowerRegion(leastLeaderStore.GetId()) +// scheduleAddLeader transfers a leader into the store. +func scheduleAddLeader(cluster schedule.Cluster, schedulerName string, storeID uint64) (*core.RegionInfo, *metapb.Peer) { + region := cluster.RandFollowerRegion(storeID) if region == nil { schedulerCounter.WithLabelValues(schedulerName, "no_target_peer").Inc() return nil, nil } - return region, region.GetStorePeer(leastLeaderStore.GetId()) + return region, region.GetStorePeer(storeID) +} + +// scheduleRemoveLeader transfers a leader out of the store. +func scheduleRemoveLeader(cluster schedule.Cluster, schedulerName string, storeID uint64, s schedule.Selector) (*core.RegionInfo, *metapb.Peer) { + region := cluster.RandLeaderRegion(storeID) + if region == nil { + schedulerCounter.WithLabelValues(schedulerName, "no_leader_region").Inc() + return nil, nil + } + targetStores := cluster.GetFollowerStores(region) + target := s.SelectTarget(targetStores) + if target == nil { + schedulerCounter.WithLabelValues(schedulerName, "no_target_store").Inc() + return nil, nil + } + + return region, region.GetStorePeer(target.GetId()) } // scheduleRemovePeer schedules a region to remove the peer. From bd9c52cfedaf485a6a608a370399bea0a7fe230c Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 25 Dec 2017 21:54:04 +0800 Subject: [PATCH 2/3] server: update limit using all operators --- server/coordinator.go | 20 ++++++++++---------- server/coordinator_test.go | 33 +++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/server/coordinator.go b/server/coordinator.go index f81f485d15a..24f372d1349 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -379,8 +379,8 @@ func (c *coordinator) addOperator(op *schedule.Operator) bool { } c.histories.Put(regionID, op) - c.limiter.addOperator(op) c.operators[regionID] = op + c.limiter.updateCounts(c.operators) if region := c.cluster.GetRegion(op.RegionID()); region != nil { if step := op.Check(region); step != nil { @@ -410,9 +410,8 @@ func (c *coordinator) removeOperator(op *schedule.Operator) { func (c *coordinator) removeOperatorLocked(op *schedule.Operator) { regionID := op.RegionID() - c.limiter.removeOperator(op) delete(c.operators, regionID) - + c.limiter.updateCounts(c.operators) c.histories.Put(regionID, op) operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() } @@ -511,16 +510,17 @@ func newScheduleLimiter() *scheduleLimiter { } } -func (l *scheduleLimiter) addOperator(op *schedule.Operator) { +// updateCounts updates resouce counts using current pending operators. +func (l *scheduleLimiter) updateCounts(operators map[uint64]*schedule.Operator) { l.Lock() defer l.Unlock() - l.counts[op.ResourceKind()]++ -} -func (l *scheduleLimiter) removeOperator(op *schedule.Operator) { - l.Lock() - defer l.Unlock() - l.counts[op.ResourceKind()]-- + for k := range l.counts { + l.counts[k] = 0 + } + for _, op := range operators { + l.counts[op.ResourceKind()]++ + } } func (l *scheduleLimiter) operatorCount(kind core.ResourceKind) uint64 { diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 4301e27ea4e..791f75c8909 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -482,21 +482,26 @@ func (s *testScheduleLimiterSuite) TestOperatorCount(c *C) { c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(0)) c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(0)) - leaderOP := newTestOperator(1, core.LeaderKind) - l.addOperator(leaderOP) + operators := make(map[uint64]*schedule.Operator) + + operators[1] = newTestOperator(1, core.LeaderKind) + l.updateCounts(operators) + c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(1)) // 1:leader + operators[2] = newTestOperator(2, core.LeaderKind) + l.updateCounts(operators) + c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(2)) // 1:leader, 2:leader + delete(operators, 1) + l.updateCounts(operators) + c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(1)) // 2:leader + + operators[1] = newTestOperator(1, core.RegionKind) + l.updateCounts(operators) + c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(1)) // 1:region 2:leader c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(1)) - l.addOperator(leaderOP) - c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(2)) - l.removeOperator(leaderOP) - c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(1)) - - regionOP := newTestOperator(1, core.RegionKind) - l.addOperator(regionOP) - c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(1)) - l.addOperator(regionOP) - c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(2)) - l.removeOperator(regionOP) - c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(1)) + operators[2] = newTestOperator(2, core.RegionKind) + l.updateCounts(operators) + c.Assert(l.operatorCount(core.RegionKind), Equals, uint64(2)) // 1:region 2:region + c.Assert(l.operatorCount(core.LeaderKind), Equals, uint64(0)) } var _ = Suite(&testScheduleControllerSuite{}) From 8ea591cdc8daa4990d1e6bbddbb9bef1a0e0574f Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 25 Dec 2017 22:07:57 +0800 Subject: [PATCH 3/3] server: fix panic that cluster status is nil --- server/cluster.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/server/cluster.go b/server/cluster.go index fa58dade425..49ef1234730 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -62,8 +62,6 @@ type RaftCluster struct { wg sync.WaitGroup quit chan struct{} - - status *ClusterStatus } // ClusterStatus saves some state information @@ -80,22 +78,19 @@ func newRaftCluster(s *Server, clusterID uint64) *RaftCluster { } } -func (c *RaftCluster) loadClusterStatus() error { +func (c *RaftCluster) loadClusterStatus() (*ClusterStatus, error) { data, err := c.s.kv.Load((c.s.kv.ClusterStatePath("raft_bootstrap_time"))) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } if len(data) == 0 { - return nil + return &ClusterStatus{}, nil } t, err := parseTimestamp([]byte(data)) if err != nil { - return errors.Trace(err) - } - c.status = &ClusterStatus{ - RaftBootstrapTime: t, + return nil, errors.Trace(err) } - return nil + return &ClusterStatus{RaftBootstrapTime: t}, nil } func (c *RaftCluster) start() error { @@ -217,13 +212,7 @@ func (s *Server) GetCluster() *metapb.Cluster { func (s *Server) GetClusterStatus() (*ClusterStatus, error) { s.cluster.Lock() defer s.cluster.Unlock() - err := s.cluster.loadClusterStatus() - if err != nil { - return nil, errors.Trace(err) - } - clone := &ClusterStatus{} - *clone = *s.cluster.status - return clone, nil + return s.cluster.loadClusterStatus() } func (s *Server) createRaftCluster() error {