Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch 'master' into new-hotspot-scheduler #1709

Merged
merged 10 commits into from
Aug 27, 2019
5 changes: 3 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -689,7 +689,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
return resp.GetRegion(), resp.GetLeader(), nil
}

func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -700,6 +700,7 @@ func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*met
resp, err := c.leaderClient().ScanRegions(ctx, &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Limit: int32(limit),
})
cancel()
Expand Down
14 changes: 8 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,16 @@ func (s *testClientSuite) TestScanRegions(c *C) {

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, 10)
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})

// Set leader of region3 to nil.
region3 := core.NewRegionInfo(regions[3], nil)
s.srv.GetRaftCluster().HandleRegionHeartbeat(region3)

check := func(start []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, limit)
check := func(start, end []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, end, limit)
c.Assert(err, IsNil)
c.Assert(scanRegions, HasLen, len(expect))
c.Assert(leaders, HasLen, len(expect))
Expand All @@ -305,9 +305,11 @@ func (s *testClientSuite) TestScanRegions(c *C) {
}
}

check([]byte{0}, 10, regions)
check([]byte{1}, 5, regions[1:6])
check([]byte{100}, 1, nil)
check([]byte{0}, nil, 10, regions)
check([]byte{1}, nil, 5, regions[1:6])
check([]byte{100}, nil, 1, nil)
check([]byte{1}, []byte{6}, 0, regions[1:6])
check([]byte{1}, []byte{6}, 2, regions[1:3])
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func main() {
default:
log.Fatal("parse cmd flags error", zap.Error(err))
}

if cfg.ConfigCheck {
server.PrintConfigCheckMsg(cfg)
exit(0)
}

// New zap logger
err = cfg.SetupLogger()
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand All @@ -48,4 +48,4 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

replace github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c => github.com/jiyingtk/kvproto v0.0.0-20190808054623-6ca6e1aa3cb9
replace github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7 => github.com/jiyingtk/kvproto v0.0.0-20190827063906-9454ffd9a2a5
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jiyingtk/kvproto v0.0.0-20190808054623-6ca6e1aa3cb9 h1:YreQ2hgIVHQSDmbiaOMmioYcjrFNTrrvGJsUDf7wECc=
github.com/jiyingtk/kvproto v0.0.0-20190808054623-6ca6e1aa3cb9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/jiyingtk/kvproto v0.0.0-20190827063906-9454ffd9a2a5 h1:idJpFdBzXHy70CNVbHgR08dVU8BazB15RSN3uEQIdaw=
github.com/jiyingtk/kvproto v0.0.0-20190827063906-9454ffd9a2a5/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
Expand Down
39 changes: 23 additions & 16 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (mc *Cluster) allocID() (uint64, error) {
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, limit)
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, endKey, limit)
}

// LoadRegion puts region info without leader
Expand Down Expand Up @@ -393,20 +393,8 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
}
leader, _ := mc.AllocPeer(leaderID)
region.Peers = []*metapb.Peer{leader}
for _, id := range followerIds {
peer, _ := mc.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}

return core.NewRegionInfo(region, leader)
func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderID, followerIDs, nil)
}

// GetOpt mocks method.
Expand Down Expand Up @@ -473,3 +461,22 @@ func (mc *Cluster) PutStoreWithLabels(id uint64, labelPairs ...string) {
}
mc.PutStore(core.NewStoreInfo(&metapb.Store{Id: id, Labels: labels}))
}

// MockRegionInfo returns a mock region
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderID uint64,
followerIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {

region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
RegionEpoch: epoch,
}
leader, _ := mc.AllocPeer(leaderID)
region.Peers = []*metapb.Peer{leader}
for _, id := range followerIDs {
peer, _ := mc.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
return core.NewRegionInfo(region, leader)
}
6 changes: 6 additions & 0 deletions pkg/mock/mockhbstream/mockhbstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ func (mhs *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHe
case <-mhs.ctx.Done():
}
}

// MsgCh returns the internal channel which contains the heartbeat responses
// from PD. It can be used to inspect the content of a PD response
func (mhs *HeartbeatStreams) MsgCh() chan *pdpb.RegionHeartbeatResponse {
return mhs.msgCh
}
4 changes: 2 additions & 2 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (h *regionsHandler) GetAll(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request) {
func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
Expand All @@ -170,7 +170,7 @@ func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request
if limit > maxRegionLimit {
limit = maxRegionLimit
}
regions := cluster.ScanRegionsByKey([]byte(startKey), limit)
regions := cluster.ScanRegions([]byte(startKey), nil, limit)
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

regionsHandler := newRegionsHandler(svr, rd)
router.HandleFunc("/api/v1/regions", regionsHandler.GetAll).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegionsByKey).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET")
router.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET")
Expand Down
1 change: 1 addition & 0 deletions server/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
log.Debug("try to merge region", zap.Stringer("from", core.RegionToHexMeta(region.GetMeta())), zap.Stringer("to", core.RegionToHexMeta(target.GetMeta())))
ops, err := operator.CreateMergeRegionOperator("merge-region", m.cluster, region, target, operator.OpMerge)
if err != nil {
log.Warn("create merge region operator failed", zap.Error(err))
return nil
}
checkerCounter.WithLabelValues("merge_checker", "new_operator").Inc()
Expand Down
128 changes: 119 additions & 9 deletions server/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
)

func TestChecker(t *testing.T) {
Expand All @@ -44,7 +45,18 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) {
cfg := mockoption.NewScheduleOptions()
cfg.MaxMergeRegionSize = 2
cfg.MaxMergeRegionKeys = 2
cfg.LabelProperties = map[string][]*metapb.StoreLabel{
opt.RejectLeader: {{Key: "reject", Value: "leader"}},
}
s.cluster = mockcluster.NewCluster(cfg)
stores := map[uint64][]string{
1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {},
7: {"reject", "leader"},
8: {"reject", "leader"},
}
for storeID, labels := range stores {
s.cluster.PutStoreWithLabels(storeID, labels...)
}
s.regions = []*core.RegionInfo{
core.NewRegionInfo(
&metapb.Region{
Expand Down Expand Up @@ -157,14 +169,19 @@ func (s *testMergeCheckerSuite) checkSteps(c *C, op *operator.Operator, steps []
func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
// partial store overlap not including leader
ops := s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
s.checkSteps(c, ops[0], []operator.OpStep{
operator.TransferLeader{FromStore: 6, ToStore: 5},
operator.AddLearner{ToStore: 1, PeerID: 1},
operator.PromoteLearner{ToStore: 1, PeerID: 1},
operator.AddLearner{ToStore: 4, PeerID: 1},
operator.PromoteLearner{ToStore: 4, PeerID: 1},

operator.RemovePeer{FromStore: 2},
operator.AddLearner{ToStore: 4, PeerID: 2},
operator.PromoteLearner{ToStore: 4, PeerID: 2},

operator.AddLearner{ToStore: 1, PeerID: 2},
operator.PromoteLearner{ToStore: 1, PeerID: 2},

operator.TransferLeader{FromStore: 6, ToStore: 5},
operator.RemovePeer{FromStore: 6},

operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
Expand Down Expand Up @@ -241,16 +258,109 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []operator.OpStep{
operator.AddLearner{ToStore: 1, PeerID: 4},
operator.PromoteLearner{ToStore: 1, PeerID: 4},
operator.AddLearner{ToStore: 4, PeerID: 4},
operator.PromoteLearner{ToStore: 4, PeerID: 4},

operator.RemovePeer{FromStore: 3},
operator.AddLearner{ToStore: 4, PeerID: 5},
operator.PromoteLearner{ToStore: 4, PeerID: 5},

operator.AddLearner{ToStore: 1, PeerID: 5},
operator.PromoteLearner{ToStore: 1, PeerID: 5},

operator.RemovePeer{FromStore: 6},

operator.AddLearner{ToStore: 5, PeerID: 6},
operator.PromoteLearner{ToStore: 5, PeerID: 6},

operator.TransferLeader{FromStore: 2, ToStore: 4},
operator.RemovePeer{FromStore: 2},

operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: false,
},
})
s.checkSteps(c, ops[1], []operator.OpStep{
operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: true,
},
})

// no overlap with reject leader label
s.regions[1] = s.regions[1].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 112, StoreId: 7},
{Id: 113, StoreId: 8},
{Id: 114, StoreId: 1},
}),
core.WithLeader(&metapb.Peer{Id: 114, StoreId: 1}),
)
s.cluster.PutRegion(s.regions[1])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []operator.OpStep{
operator.AddLearner{ToStore: 1, PeerID: 7},
operator.PromoteLearner{ToStore: 1, PeerID: 7},

operator.RemovePeer{FromStore: 3},

operator.AddLearner{ToStore: 7, PeerID: 8},
operator.PromoteLearner{ToStore: 7, PeerID: 8},

operator.RemovePeer{FromStore: 6},

operator.AddLearner{ToStore: 8, PeerID: 9},
operator.PromoteLearner{ToStore: 8, PeerID: 9},

operator.TransferLeader{FromStore: 2, ToStore: 1},
operator.RemovePeer{FromStore: 2},

operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: false,
},
})
s.checkSteps(c, ops[1], []operator.OpStep{
operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: true,
},
})

// overlap with reject leader label
s.regions[1] = s.regions[1].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 115, StoreId: 7},
{Id: 116, StoreId: 8},
{Id: 117, StoreId: 1},
}),
core.WithLeader(&metapb.Peer{Id: 117, StoreId: 1}),
)
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 118, StoreId: 7},
{Id: 119, StoreId: 3},
{Id: 120, StoreId: 2},
}),
core.WithLeader(&metapb.Peer{Id: 120, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[1])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []operator.OpStep{
operator.AddLearner{ToStore: 1, PeerID: 10},
operator.PromoteLearner{ToStore: 1, PeerID: 10},

operator.RemovePeer{FromStore: 3},

operator.AddLearner{ToStore: 8, PeerID: 11},
operator.PromoteLearner{ToStore: 8, PeerID: 11},

operator.TransferLeader{FromStore: 2, ToStore: 1},
operator.RemovePeer{FromStore: 2},

operator.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
Expand Down
Loading