Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: fix a thread-safe bug and improve code (#1719) #1734

Merged
merged 15 commits into from
Sep 11, 2019
23 changes: 0 additions & 23 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,28 +634,6 @@ func (c *RaftCluster) RemoveTombStoneRecords() error {
return nil
}

func (c *RaftCluster) checkOperators() {
opController := c.coordinator.opController
for _, op := range opController.GetOperators() {
// after region is merged, it will not heartbeat anymore
// the operator of merged region will not timeout actively
if c.cachedCluster.GetRegion(op.RegionID()) == nil {
log.Debug("remove operator cause region is merged",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
continue
}

if op.IsTimeout() {
log.Info("operator timeout",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveTimeoutOperator(op)
}
}
}

func (c *RaftCluster) collectMetrics() {
cluster := c.cachedCluster
statsMap := statistics.NewStoreStatisticsMap(c.cachedCluster.opt, c.GetNamespaceClassifier())
Expand Down Expand Up @@ -699,7 +677,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
log.Info("background jobs has been stopped")
return
case <-ticker.C:
c.checkOperators()
c.checkStores()
c.collectMetrics()
c.coordinator.opController.PruneHistory()
Expand Down
16 changes: 8 additions & 8 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(0))

// Remove the operator manually, then we can add a new operator.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(op2.Kind()), Equals, uint64(1))
c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID())
Expand Down Expand Up @@ -752,7 +752,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) {
op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), schedule.OpLeader)
oc.AddWaitingOperator(op2)
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(2)) // 1:leader, 2:leader
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(1)) // 2:leader

op1 = newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), schedule.OpRegion)
Expand Down Expand Up @@ -787,7 +787,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
for i := 0; i < 10; i++ {
c.Assert(lb.Schedule(tc), IsNil)
}
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
Expand Down Expand Up @@ -876,7 +876,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
// count = 2
c.Assert(sc.AllowSchedule(), IsFalse)
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
// count = 1
c.Assert(sc.AllowSchedule(), IsTrue)

Expand All @@ -887,7 +887,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
c.Assert(sc.AllowSchedule(), IsFalse)
c.Assert(oc.AddWaitingOperator(op3), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op3)
c.Assert(oc.RemoveOperator(op3), IsTrue)

// add a admin operator will remove old operator
c.Assert(oc.AddWaitingOperator(op2), IsTrue)
Expand All @@ -896,14 +896,14 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
op4.SetPriorityLevel(core.HighPriority)
c.Assert(oc.AddWaitingOperator(op4), IsTrue)
c.Assert(sc.AllowSchedule(), IsTrue)
oc.RemoveOperator(op4)
c.Assert(oc.RemoveOperator(op4), IsTrue)

// test wrong region id.
op5 := newTestOperator(3, &metapb.RegionEpoch{}, schedule.OpHotRegion)
c.Assert(oc.AddWaitingOperator(op5), IsFalse)

// test wrong region epoch.
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsTrue)
epoch := &metapb.RegionEpoch{
Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1,
ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(),
Expand All @@ -913,7 +913,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
epoch.Version--
op6 = newTestOperator(1, epoch, schedule.OpLeader)
c.Assert(oc.AddWaitingOperator(op6), IsTrue)
oc.RemoveOperator(op6)
c.Assert(oc.RemoveOperator(op6), IsTrue)
}

func (s *testScheduleControllerSuite) TestInterval(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return ErrOperatorNotFound
}

c.opController.RemoveOperator(op)
_ = c.opController.RemoveOperator(op)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/schedule/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
s.regions[2] = s.regions[2].Clone(
Expand All @@ -309,7 +309,7 @@ func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
Expand Down
49 changes: 27 additions & 22 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/juju/ratelimit"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/eraftpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -89,23 +90,25 @@ func NewOperatorController(cluster Cluster, hbStreams HeartbeatStreams) *Operato
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
failpoint.Inject("concurrentRemoveOperator", func() {
time.Sleep(500 * time.Millisecond)
})
timeout := op.IsTimeout()
if step := op.Check(region); step != nil && !timeout {
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
oc.SendScheduleCommand(region, step, source)
return
}
if op.IsFinish() {
if op.IsFinish() && oc.RemoveOperator(op) {
log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "finish").Inc()
operatorDuration.WithLabelValues(op.Desc()).Observe(op.RunningTime().Seconds())
oc.pushHistory(op)
oc.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS)
oc.RemoveOperator(op)
oc.PromoteWaitingOperator()
} else if timeout {
} else if timeout && oc.RemoveOperator(op) {
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op))
oc.RemoveTimeoutOperator(op)
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT)
oc.PromoteWaitingOperator()
}
Expand Down Expand Up @@ -138,11 +141,17 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
_ = oc.removeOperatorLocked(op)
log.Debug("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
operatorCounter.WithLabelValues(op.Desc(), "disappear").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
return nil, true
}
step := op.Check(r)
if step == nil {
return nil, true
return r, true
}
now := time.Now()
if now.Before(item.time) {
Expand Down Expand Up @@ -210,7 +219,7 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {

if oc.exceedStoreLimit(ops...) || !oc.checkAddOperator(ops...) {
for _, op := range ops {
operatorCounter.WithLabelValues(op.Desc(), "canceled").Inc()
operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc()
oc.opRecords.Put(op, pdpb.OperatorStatus_CANCEL)
}
return false
Expand Down Expand Up @@ -286,10 +295,10 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
// If there is an old operator, replace it. The priority should be checked
// already.
if old, ok := oc.operators[regionID]; ok {
_ = oc.removeOperatorLocked(old)
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old))
operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc()
operatorCounter.WithLabelValues(old.Desc(), "replace").Inc()
oc.opRecords.Put(old, pdpb.OperatorStatus_REPLACE)
oc.removeOperatorLocked(old)
}

oc.operators[regionID] = op
Expand Down Expand Up @@ -320,18 +329,10 @@ func (oc *OperatorController) addOperatorLocked(op *Operator) bool {
}

// RemoveOperator removes a operator from the running operators.
func (oc *OperatorController) RemoveOperator(op *Operator) {
func (oc *OperatorController) RemoveOperator(op *Operator) (found bool) {
oc.Lock()
defer oc.Unlock()
oc.removeOperatorLocked(op)
}

// RemoveTimeoutOperator removes a operator which is timeout from the running operators.
func (oc *OperatorController) RemoveTimeoutOperator(op *Operator) {
oc.Lock()
defer oc.Unlock()
operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc()
oc.removeOperatorLocked(op)
return oc.removeOperatorLocked(op)
}

// GetOperatorStatus gets the operator and its status with the specify id.
Expand All @@ -347,11 +348,15 @@ func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
return oc.opRecords.Get(id)
}

func (oc *OperatorController) removeOperatorLocked(op *Operator) {
func (oc *OperatorController) removeOperatorLocked(op *Operator) bool {
regionID := op.RegionID()
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
if cur := oc.operators[regionID]; cur == op {
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
return true
}
return false
}

// GetOperator gets a operator from the given region.
Expand Down
72 changes: 69 additions & 3 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ package schedule

import (
"container/heap"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testOperatorControllerSuite{})
Expand All @@ -45,8 +48,9 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
oc.SetOperator(op1)
oc.SetOperator(op2)
go func() {
c.Assert(oc.RemoveOperator(op1), IsTrue)
for {
oc.RemoveOperator(op1)
c.Assert(oc.RemoveOperator(op1), IsFalse)
}
}()
go func() {
Expand Down Expand Up @@ -92,27 +96,76 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS)
}

// issue #1716
func (t *testOperatorControllerSuite) TestConcurrentRemoveOperator(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream())
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 1)
tc.AddLeaderRegion(1, 2, 1)
region1 := tc.GetRegion(1)
steps := []OperatorStep{
RemovePeer{FromStore: 1},
AddPeer{ToStore: 1, PeerID: 4},
}
// finished op with normal priority
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
// unfinished op with high priority
op2 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion|OpAdmin, steps...)
op2.SetPriorityLevel(core.HighPriority)

oc.SetOperator(op1)

c.Assert(failpoint.Enable("github.com/pingcap/pd/server/schedule/concurrentRemoveOperator", "return(true)"), IsNil)

var wg sync.WaitGroup
wg.Add(2)
go func() {
oc.Dispatch(region1, "test")
wg.Done()
}()
go func() {
time.Sleep(50 * time.Millisecond)
success := oc.AddOperator(op2)
// If the assert failed before wg.Done, the test will be blocked.
defer c.Assert(success, IsTrue)
wg.Done()
}()
wg.Wait()

c.Assert(oc.GetOperator(1), Equals, op2)
}

func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := NewOperatorController(tc, mockhbstream.NewHeartbeatStream())
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(2, 1)
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 1, 2)
tc.AddLeaderRegion(4, 2, 1)
steps := []OperatorStep{
RemovePeer{FromStore: 2},
AddPeer{ToStore: 2, PeerID: 4},
}
op1 := NewOperator("test", 1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
op2 := NewOperator("test", 2, &metapb.RegionEpoch{}, OpRegion, steps...)
op3 := NewOperator("test", 3, &metapb.RegionEpoch{}, OpRegion, steps...)
op4 := NewOperator("test", 4, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
region1 := tc.GetRegion(1)
region2 := tc.GetRegion(2)
region4 := tc.GetRegion(4)
// Adds operator and pushes to the notifier queue.
{
oc.SetOperator(op1)
oc.SetOperator(op3)
oc.SetOperator(op4)
oc.SetOperator(op2)
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op1, time: time.Now().Add(100 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op3, time: time.Now().Add(300 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op4, time: time.Now().Add(499 * time.Millisecond)})
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op2, time: time.Now().Add(500 * time.Millisecond)})
}
// fisrt poll got nil
Expand All @@ -126,14 +179,27 @@ func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region1.GetID())

// find op3 with nil region, remove it
c.Assert(oc.GetOperator(3), NotNil)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)
c.Assert(next, IsTrue)
c.Assert(oc.GetOperator(3), IsNil)

// find op4 finished
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region4.GetID())

// after waiting 500 millseconds, the region2 need to dispatch
time.Sleep(400 * time.Millisecond)
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, NotNil)
c.Assert(next, IsTrue)
c.Assert(r.GetID(), Equals, region2.GetID())
r, next = oc.pollNeedDispatchRegion()
c.Assert(r, IsNil)
c.Assert(next, IsFalse)
}