Skip to content

Commit

Permalink
limiter count group by store
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor1996 committed May 7, 2018
1 parent a288840 commit a251c22
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 138 deletions.
22 changes: 10 additions & 12 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,8 @@ func (c *coordinator) runScheduler(s *scheduleController) {
continue
}
opInfluence := schedule.NewOpInfluence(c.getOperators(), c.cluster)
if op := s.Schedule(c.cluster, opInfluence); op != nil {
if len(op) == 1 {
c.addOperator(op[0])
} else {
c.addOperators(op...)
}
if ops := s.Schedule(c.cluster, opInfluence); ops != nil {
c.addOperators(ops...)
}

case <-s.Ctx().Done():
Expand Down Expand Up @@ -450,17 +446,19 @@ func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool {
c.removeOperatorLocked(old)
}

c.operators[regionID] = op
c.limiter.UpdateCounts(c.operators)

if region := c.cluster.GetRegion(op.RegionID()); region != nil {
c.operators[regionID] = op
c.limiter.AddOperator(op, region)
if step := op.Check(region); step != nil {
c.sendScheduleCommand(region, step)
}
operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
}

operatorCounter.WithLabelValues(op.Desc(), "create").Inc()
return true
log.Warnf("add operator %v on nonexistent region %d", op, regionID)
operatorCounter.WithLabelValues(op.Desc(), "no_region").Inc()
return false
}

func (c *coordinator) addOperator(op *schedule.Operator) bool {
Expand Down Expand Up @@ -520,7 +518,7 @@ func (c *coordinator) removeOperator(op *schedule.Operator) {
func (c *coordinator) removeOperatorLocked(op *schedule.Operator) {
regionID := op.RegionID()
delete(c.operators, regionID)
c.limiter.UpdateCounts(c.operators)
c.limiter.RemoveOperator(op)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
}

Expand Down
68 changes: 20 additions & 48 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/pingcap/pd/server/schedulers"
)

func newTestOperator(regionID uint64, kind schedule.OperatorKind) *schedule.Operator {
return schedule.NewOperator("test", regionID, kind)
func newTestOperator(regionID uint64, kind schedule.OperatorKind, steps ...schedule.OperatorStep) *schedule.Operator {
return schedule.NewOperator("test", regionID, kind, steps...)
}

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption) {
Expand Down Expand Up @@ -132,22 +132,20 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
l := co.limiter

op1 := newTestOperator(1, schedule.OpLeader)
co.addOperator(op1)
c.Assert(l.OperatorCount(op1.Kind()), Equals, uint64(1))
tc.addLeaderRegion(1, 1, 2)

op1 := newTestOperator(1, schedule.OpLeader, schedule.TransferLeader{FromStore: 1, ToStore: 2})
c.Assert(co.addOperator(op1), IsTrue)
c.Assert(co.getOperator(1).RegionID(), Equals, op1.RegionID())

// Region 1 already has an operator, cannot add another one.
op2 := newTestOperator(1, schedule.OpRegion)
co.addOperator(op2)
c.Assert(l.OperatorCount(op2.Kind()), Equals, uint64(0))
op2 := newTestOperator(1, schedule.OpLeader, schedule.TransferLeader{FromStore: 1, ToStore: 3})
c.Assert(co.addOperator(op2), IsFalse)

// Remove the operator manually, then we can add a new operator.
co.removeOperator(op1)
co.addOperator(op2)
c.Assert(l.OperatorCount(op2.Kind()), Equals, uint64(1))
c.Assert(co.addOperator(op2), IsTrue)
c.Assert(co.getOperator(1).RegionID(), Equals, op2.RegionID())
}

Expand Down Expand Up @@ -553,37 +551,6 @@ func waitOperator(c *C, co *coordinator, regionID uint64) {
})
}

var _ = Suite(&testScheduleLimiterSuite{})

type testScheduleLimiterSuite struct{}

func (s *testScheduleLimiterSuite) TestOperatorCount(c *C) {
l := schedule.NewLimiter()
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(0))
c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(0))

operators := make(map[uint64]*schedule.Operator)

operators[1] = newTestOperator(1, schedule.OpLeader)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 1:leader
operators[2] = newTestOperator(2, schedule.OpLeader)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader
delete(operators, 1)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 2:leader

operators[1] = newTestOperator(1, schedule.OpRegion)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(1)) // 1:region 2:leader
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(1))
operators[2] = newTestOperator(2, schedule.OpRegion)
l.UpdateCounts(operators)
c.Assert(l.OperatorCount(schedule.OpRegion), Equals, uint64(2)) // 1:region 2:region
c.Assert(l.OperatorCount(schedule.OpLeader), Equals, uint64(0))
}

var _ = Suite(&testScheduleControllerSuite{})

type testScheduleControllerSuite struct{}
Expand All @@ -596,6 +563,10 @@ type mockLimitScheduler struct {
kind schedule.OperatorKind
}

func (s *mockLimitScheduler) SetUpSuite(c *C) {

}

func (s *mockLimitScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
return s.counter.OperatorCount(s.kind) < s.limit
}
Expand Down Expand Up @@ -623,25 +594,26 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
}
// limit = 2
lb.limit = 2

// put region info
tc.addLeaderRegion(1, 1, 2)
tc.addLeaderRegion(2, 2, 1)

// count = 0
c.Assert(sc.AllowSchedule(), IsTrue)
op1 := newTestOperator(1, schedule.OpLeader)
op1 := newTestOperator(1, schedule.OpLeader, schedule.TransferLeader{FromStore: 1, ToStore: 2})
c.Assert(co.addOperator(op1), IsTrue)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)
op2 := newTestOperator(2, schedule.OpLeader)
op2 := newTestOperator(2, schedule.OpLeader, schedule.TransferLeader{FromStore: 2, ToStore: 1})
c.Assert(co.addOperator(op2), IsTrue)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
co.removeOperator(op1)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)

// add a PriorityKind operator will remove old operator
op3 := newTestOperator(2, schedule.OpHotRegion)
op3.SetPriorityLevel(core.HighPriority)
c.Assert(co.addOperator(op1), IsTrue)
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(co.addOperator(op3), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
co.removeOperator(op3)
Expand Down
102 changes: 102 additions & 0 deletions server/schedule/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule

import (
"sync"

"github.com/pingcap/pd/server/core"
log "github.com/sirupsen/logrus"
)

// Limiter is a counter that limits the number of operators.
type Limiter struct {
sync.RWMutex
counts map[OperatorKind]map[uint64]uint64
}

// NewLimiter creates a schedule limiter.
func NewLimiter() *Limiter {
return &Limiter{
counts: make(map[OperatorKind]map[uint64]uint64),
}
}

// AddOperator increases the count by kind.
func (l *Limiter) AddOperator(op *Operator, region *core.RegionInfo) {
l.Lock()
defer l.Unlock()

if _, ok := l.counts[op.Kind()]; !ok {
l.counts[op.Kind()] = make(map[uint64]uint64)
}

for _, store := range op.InvolvedStores(region) {
l.counts[op.Kind()][store]++
}
}

// RemoveOperator decreases the count by kind.
func (l *Limiter) RemoveOperator(op *Operator) {
l.Lock()
defer l.Unlock()

for _, store := range op.InvolvedStores(nil) {
_, ok := l.counts[op.Kind()][store]
if ok {
if l.counts[op.Kind()][store] == 0 {
log.Fatal("the limiter is already 0, no operators need to remove")
}
l.counts[op.Kind()][store]--
} else {
log.Fatalf("operator count decrease on nonexisted store %d", store)
}
}
}

// OperatorCount gets the max count of operators of all stores filtered by mask.
func (l *Limiter) OperatorCount(mask OperatorKind) uint64 {
l.RLock()
defer l.RUnlock()

var max uint64
counts := make(map[uint64]uint64)
for k, stores := range l.counts {
if k&mask != 0 {
for storeID, count := range stores {
counts[storeID] += count
if max < counts[storeID] {
max = counts[storeID]
}
}
}
}
return max
}

// StoreOperatorCount gets the count of operators for specific store filtered by mask.
func (l *Limiter) StoreOperatorCount(mask OperatorKind, storeID uint64) uint64 {
l.RLock()
defer l.RUnlock()

var total uint64
for k, stores := range l.counts {
if k&mask != 0 {
if count, ok := stores[storeID]; ok {
total += count
}
}
}
return total
}
65 changes: 65 additions & 0 deletions server/schedule/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule

import (
. "github.com/pingcap/check"
)

var _ = Suite(&testLimiterSuite{})

type testLimiterSuite struct{}

func (s *testLimiterSuite) TestOperatorCount(c *C) {
l := NewLimiter()
c.Assert(l.OperatorCount(OpLeader), Equals, uint64(0))
c.Assert(l.OperatorCount(OpRegion), Equals, uint64(0))

ops := []*Operator{}

// init region and operator
for i := uint64(1); i <= 3; i++ {
op := newTestOperator(i, OpLeader|OpRegion, TransferLeader{FromStore: i + 1, ToStore: 1})
region := newTestRegion(i, i+1, [2]uint64{1, 1}, [2]uint64{i + 1, i + 1})
l.AddOperator(op, region)
ops = append(ops, op)
}

c.Assert(l.OperatorCount(OpLeader), Equals, uint64(3))
c.Assert(l.StoreOperatorCount(OpLeader, 1), Equals, uint64(3))
c.Assert(l.StoreOperatorCount(OpLeader, 2), Equals, uint64(1))
c.Assert(l.StoreOperatorCount(OpLeader, 3), Equals, uint64(1))
c.Assert(l.StoreOperatorCount(OpLeader, 4), Equals, uint64(1))

l.RemoveOperator(ops[0])
c.Assert(l.OperatorCount(OpLeader|OpRegion), Equals, uint64(2))
c.Assert(l.StoreOperatorCount(OpLeader|OpRegion, 1), Equals, uint64(2))
c.Assert(l.StoreOperatorCount(OpLeader|OpRegion, 2), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpLeader|OpRegion, 3), Equals, uint64(1))
c.Assert(l.StoreOperatorCount(OpLeader|OpRegion, 4), Equals, uint64(1))

l.RemoveOperator(ops[1])
c.Assert(l.OperatorCount(OpRegion), Equals, uint64(1))
c.Assert(l.StoreOperatorCount(OpRegion, 1), Equals, uint64(1))
c.Assert(l.StoreOperatorCount(OpRegion, 2), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpRegion, 3), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpRegion, 4), Equals, uint64(1))

l.RemoveOperator(ops[2])
c.Assert(l.OperatorCount(OpLeader), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpLeader, 1), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpLeader, 2), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpLeader, 3), Equals, uint64(0))
c.Assert(l.StoreOperatorCount(OpLeader, 4), Equals, uint64(0))
}
Loading

0 comments on commit a251c22

Please sign in to comment.