From f8f49cb195281a98526318a93299ca3ca1c3f580 Mon Sep 17 00:00:00 2001 From: baishen Date: Mon, 10 Jun 2019 14:49:50 +0800 Subject: [PATCH 1/2] set detached as default mode for pd-ctl (#1567) * set detached as default mode for pd-ctl --- tools/pd-ctl/README.md | 9 +++++++-- tools/pd-ctl/main.go | 10 ++++++---- tools/pd-ctl/pdctl/ctl.go | 7 +++++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/tools/pd-ctl/README.md b/tools/pd-ctl/README.md index 123c040f4a5..79897ff3efa 100644 --- a/tools/pd-ctl/README.md +++ b/tools/pd-ctl/README.md @@ -13,11 +13,11 @@ pd-ctl is a command line tool for PD, pd-ctl obtains the state information of th Single-command mode: - ./pd-ctl store -d -u http://127.0.0.1:2379 + ./pd-ctl store -u http://127.0.0.1:2379 Interactive mode: - ./pd-ctl -u http://127.0.0.1:2379 + ./pd-ctl -i -u http://127.0.0.1:2379 Use environment variables: @@ -43,6 +43,11 @@ Use TLS to encrypt: ### \-\-detach,-d + Use single command line mode (not entering readline) ++ Default: true + +### \-\-interact,-i + ++ Use interactive mode (entering readline) + Default: false ### --cacert diff --git a/tools/pd-ctl/main.go b/tools/pd-ctl/main.go index 98c6852deac..a544e656ce9 100644 --- a/tools/pd-ctl/main.go +++ b/tools/pd-ctl/main.go @@ -32,6 +32,7 @@ import ( var ( url string detach bool + interact bool version bool caPath string certPath string @@ -40,7 +41,8 @@ var ( func init() { flag.StringVarP(&url, "pd", "u", "http://127.0.0.1:2379", "The pd address") - flag.BoolVarP(&detach, "detach", "d", false, "Run pdctl without readline") + flag.BoolVarP(&detach, "detach", "d", true, "Run pdctl without readline") + flag.BoolVarP(&interact, "interact", "i", false, "Run pdctl with readline") flag.BoolVarP(&version, "version", "V", false, "print version information and exit") flag.StringVar(&caPath, "cacert", "", "path of file that contains list of trusted SSL CAs.") flag.StringVar(&certPath, "cert", "", "path of file that contains X509 certificate in PEM format.") @@ -88,11 +90,11 @@ func main() { } input = strings.Split(strings.TrimSpace(string(b[:])), " ") } - if detach { - pdctl.Start(append(os.Args[1:], input...)) + if interact { + loop() return } - loop() + pdctl.Start(append(os.Args[1:], input...)) } func loop() { diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index ba0d1228df6..37af5a7e20c 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -67,11 +67,14 @@ func Start(args []string) { rootCmd.SetArgs(args) rootCmd.SilenceErrors = true + rootCmd.SetUsageTemplate(command.UsageTemplate) + rootCmd.SetOutput(os.Stdout) + if err := rootCmd.ParseFlags(args); err != nil { rootCmd.Println(err) + rootCmd.Println(rootCmd.UsageString()) + return } - rootCmd.SetUsageTemplate(command.UsageTemplate) - rootCmd.SetOutput(os.Stdout) if len(commandFlags.CAPath) != 0 { if err := command.InitHTTPSClient(commandFlags.CAPath, commandFlags.CertPath, commandFlags.KeyPath); err != nil { From ebf6cf27c1c1fe0f529cec8a0fa40b88277d9fda Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 10 Jun 2019 15:25:30 +0800 Subject: [PATCH 2/2] move mock functions into a separate package (#1562) Signed-off-by: Ryan Leung --- client/client_test.go | 5 +- server/cluster_info.go | 8 +- server/cluster_info_test.go | 9 +- server/cluster_test.go | 3 +- server/coordinator_test.go | 56 +- server/{schedule => core}/basic_cluster.go | 42 +- server/core/test_util.go | 16 - server/mock/classifier.go | 67 ++ server/mock/cluster.go | 459 ++++++++++ server/mock/heartbeat_stream.go | 97 +++ server/mock/id.go | 31 + server/mock/schedule.go | 232 ++++++ .../namespace/{classifier.go => namespace.go} | 9 + server/placement/constraint.go | 2 +- server/placement/functions.go | 6 +- server/placement/placement_test.go | 87 +- server/region_statistics_test.go | 44 +- server/schedule/filters_test.go | 5 +- server/schedule/merge_checker_test.go | 9 +- server/schedule/mockcluster.go | 783 ------------------ server/schedule/operator_controller_test.go | 21 +- server/schedule/opts.go | 9 - server/schedule/replica_test.go | 7 +- server/schedule/scheduler.go | 3 +- server/schedule/test_util.go | 104 +++ server/schedulers/balance_test.go | 91 +- server/schedulers/scheduler_test.go | 41 +- table/namespace_classifier_test.go | 5 +- 28 files changed, 1160 insertions(+), 1091 deletions(-) rename server/{schedule => core}/basic_cluster.go (74%) create mode 100644 server/mock/classifier.go create mode 100644 server/mock/cluster.go create mode 100644 server/mock/heartbeat_stream.go create mode 100644 server/mock/id.go create mode 100644 server/mock/schedule.go rename server/namespace/{classifier.go => namespace.go} (91%) delete mode 100644 server/schedule/mockcluster.go create mode 100644 server/schedule/test_util.go diff --git a/client/client_test.go b/client/client_test.go index 2ab45e970dc..175596d2187 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "google.golang.org/grpc" ) @@ -39,7 +40,7 @@ func TestClient(t *testing.T) { var _ = Suite(&testClientSuite{}) type idAllocator struct { - allocator *core.MockIDAllocator + allocator *mock.IDAllocator } func (i *idAllocator) alloc() uint64 { @@ -48,7 +49,7 @@ func (i *idAllocator) alloc() uint64 { } var ( - regionIDAllocator = &idAllocator{allocator: &core.MockIDAllocator{}} + regionIDAllocator = &idAllocator{allocator: &mock.IDAllocator{}} // Note: IDs below are entirely arbitrary. They are only for checking // whether GetRegion/GetStore works. // If we alloc ID in client in the future, these IDs must be updated. diff --git a/server/cluster_info.go b/server/cluster_info.go index 0820f12820d..a52fa1c9e08 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -24,15 +24,13 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" - "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/statistics" "go.uber.org/zap" ) type clusterInfo struct { sync.RWMutex - core *schedule.BasicCluster - + core *core.BasicCluster id core.IDAllocator kv *core.KV meta *metapb.Cluster @@ -48,7 +46,7 @@ var defaultChangedRegionsLimit = 10000 func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { return &clusterInfo{ - core: schedule.NewBasicCluster(), + core: core.NewBasicCluster(), id: id, opt: opt, kv: kv, @@ -680,7 +678,7 @@ func (c *clusterInfo) GetRegionStatsByType(typ regionStatisticType) []*core.Regi return c.regionStats.getRegionStatsByType(typ) } -func (c *clusterInfo) GetOpt() schedule.NamespaceOptions { +func (c *clusterInfo) GetOpt() namespace.ScheduleOptions { return c.opt } diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index d4da3a34825..07b0d685ab5 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) var _ = Suite(&testStoresInfoSuite{}) @@ -301,7 +302,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) n, np := uint64(3), uint64(3) stores := newTestStores(n) @@ -347,7 +348,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) n, np := uint64(3), uint64(3) @@ -562,7 +563,7 @@ func heartbeatRegions(c *C, cluster *clusterInfo, regions []*metapb.Region) { func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil) + cluster := newClusterInfo(mock.NewIDAllocator(), opt, nil) // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) @@ -601,7 +602,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(core.NewMockIDAllocator(), opt, nil) + cluster := newClusterInfo(mock.NewIDAllocator(), opt, nil) regions := []*metapb.Region{ { diff --git a/server/cluster_test.go b/server/cluster_test.go index d0801e72fca..8d158456e98 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -591,7 +592,7 @@ type testGetStoresSuite struct { func (s *testGetStoresSuite) SetUpSuite(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - s.cluster = newClusterInfo(core.NewMockIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + s.cluster = newClusterInfo(mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) stores := newTestStores(200) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 2b3fa951ab5..3a650559354 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -25,10 +25,10 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/schedulers" - "github.com/pkg/errors" ) func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind schedule.OperatorKind) *schedule.Operator { @@ -52,7 +52,7 @@ type testClusterInfo struct { func newTestClusterInfo(opt *scheduleOption) *testClusterInfo { return &testClusterInfo{clusterInfo: newClusterInfo( - core.NewMockIDAllocator(), + mock.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV()), )} @@ -171,34 +171,6 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { c.Assert(oc.GetOperator(1).RegionID(), Equals, op2.RegionID()) } -type mockHeartbeatStream struct { - ch chan *pdpb.RegionHeartbeatResponse -} - -func (s *mockHeartbeatStream) Send(m *pdpb.RegionHeartbeatResponse) error { - select { - case <-time.After(time.Second): - return errors.New("timeout") - case s.ch <- m: - } - return nil -} - -func (s *mockHeartbeatStream) Recv() *pdpb.RegionHeartbeatResponse { - select { - case <-time.After(time.Millisecond * 10): - return nil - case res := <-s.ch: - return res - } -} - -func newMockHeartbeatStream() *mockHeartbeatStream { - return &mockHeartbeatStream{ - ch: make(chan *pdpb.RegionHeartbeatResponse), - } -} - func (s *testCoordinatorSuite) TestDispatch(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) @@ -233,7 +205,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { testutil.CheckTransferLeader(c, co.opController.GetOperator(2), schedule.OpBalance, 4, 2) c.Assert(co.removeScheduler("balance-leader-scheduler"), IsNil) - stream := newMockHeartbeatStream() + stream := mock.NewHeartbeatStream() // Transfer peer. region := tc.GetRegion(1).Clone() @@ -255,7 +227,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { waitNoResponse(c, stream) } -func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream *mockHeartbeatStream) error { +func dispatchHeartbeat(c *C, co *coordinator, region *core.RegionInfo, stream mock.HeartbeatStream) error { co.hbStreams.bindStream(region.GetLeader().GetStoreId(), stream) if err := co.cluster.putRegion(region.Clone()); err != nil { return err @@ -365,7 +337,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { c.Assert(tc.addRegionStore(3, 3), IsNil) c.Assert(tc.addRegionStore(4, 4), IsNil) - stream := newMockHeartbeatStream() + stream := mock.NewHeartbeatStream() // Add peer to store 1. c.Assert(tc.addLeaderRegion(1, 2, 3), IsNil) @@ -430,7 +402,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { c.Assert(tc.addRegionStore(4, 40), IsNil) c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) - stream := newMockHeartbeatStream() + stream := mock.NewHeartbeatStream() // Wait for schedule. waitOperator(c, co, 1) @@ -578,7 +550,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { c.Assert(co.removeScheduler("label-scheduler"), IsNil) c.Assert(co.schedulers, HasLen, 0) - stream := newMockHeartbeatStream() + stream := mock.NewHeartbeatStream() // Add stores 1,2,3 c.Assert(tc.addLeaderStore(1, 1), IsNil) @@ -731,7 +703,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { // Add 1 replica on store 2. co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() - stream := newMockHeartbeatStream() + stream := mock.NewHeartbeatStream() c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) region = waitAddLearner(c, stream, region, 2) c.Assert(dispatchHeartbeat(c, co, region, stream), IsNil) @@ -764,7 +736,7 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := schedule.NewMockHeartbeatStreams(tc.clusterInfo.getClusterID()) + hbStreams := mock.NewHeartbeatStreams(tc.clusterInfo.getClusterID()) oc := schedule.NewOperatorController(tc.clusterInfo, hbStreams) c.Assert(oc.OperatorCount(schedule.OpLeader), Equals, uint64(0)) @@ -909,7 +881,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { } } -func waitAddLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { +func waitAddLearner(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.WaitUntil(c, func(c *C) bool { if res = stream.Recv(); res != nil { @@ -925,7 +897,7 @@ func waitAddLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, ) } -func waitPromoteLearner(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { +func waitPromoteLearner(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.WaitUntil(c, func(c *C) bool { if res = stream.Recv(); res != nil { @@ -942,7 +914,7 @@ func waitPromoteLearner(c *C, stream *mockHeartbeatStream, region *core.RegionIn ) } -func waitRemovePeer(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { +func waitRemovePeer(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.WaitUntil(c, func(c *C) bool { if res = stream.Recv(); res != nil { @@ -958,7 +930,7 @@ func waitRemovePeer(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, ) } -func waitTransferLeader(c *C, stream *mockHeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { +func waitTransferLeader(c *C, stream mock.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { var res *pdpb.RegionHeartbeatResponse testutil.WaitUntil(c, func(c *C) bool { if res = stream.Recv(); res != nil { @@ -971,7 +943,7 @@ func waitTransferLeader(c *C, stream *mockHeartbeatStream, region *core.RegionIn ) } -func waitNoResponse(c *C, stream *mockHeartbeatStream) { +func waitNoResponse(c *C, stream mock.HeartbeatStream) { testutil.WaitUntil(c, func(c *C) bool { res := stream.Recv() return res == nil diff --git a/server/schedule/basic_cluster.go b/server/core/basic_cluster.go similarity index 74% rename from server/schedule/basic_cluster.go rename to server/core/basic_cluster.go index 85264d387ea..31c650bf548 100644 --- a/server/schedule/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -11,44 +11,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule - -import ( - "github.com/pingcap/pd/server/core" -) +package core // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { - Stores *core.StoresInfo - Regions *core.RegionsInfo + Stores *StoresInfo + Regions *RegionsInfo } // NewBasicCluster creates a BasicCluster. func NewBasicCluster() *BasicCluster { return &BasicCluster{ - Stores: core.NewStoresInfo(), - Regions: core.NewRegionsInfo(), + Stores: NewStoresInfo(), + Regions: NewRegionsInfo(), } } // GetStores returns all Stores in the cluster. -func (bc *BasicCluster) GetStores() []*core.StoreInfo { +func (bc *BasicCluster) GetStores() []*StoreInfo { return bc.Stores.GetStores() } // GetStore searches for a store by ID. -func (bc *BasicCluster) GetStore(storeID uint64) *core.StoreInfo { +func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo { return bc.Stores.GetStore(storeID) } // GetRegion searches for a region by ID. -func (bc *BasicCluster) GetRegion(regionID uint64) *core.RegionInfo { +func (bc *BasicCluster) GetRegion(regionID uint64) *RegionInfo { return bc.Regions.GetRegion(regionID) } // GetRegionStores returns all Stores that contains the region's peer. -func (bc *BasicCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo { - var Stores []*core.StoreInfo +func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { + var Stores []*StoreInfo for id := range region.GetStoreIds() { if store := bc.Stores.GetStore(id); store != nil { Stores = append(Stores, store) @@ -58,8 +54,8 @@ func (bc *BasicCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreIn } // GetFollowerStores returns all Stores that contains the region's follower peer. -func (bc *BasicCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo { - var Stores []*core.StoreInfo +func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { + var Stores []*StoreInfo for id := range region.GetFollowers() { if store := bc.Stores.GetStore(id); store != nil { Stores = append(Stores, store) @@ -69,12 +65,12 @@ func (bc *BasicCluster) GetFollowerStores(region *core.RegionInfo) []*core.Store } // GetLeaderStore returns all Stores that contains the region's leader peer. -func (bc *BasicCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo { +func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo { return bc.Stores.GetStore(region.GetLeader().GetStoreId()) } // GetAdjacentRegions returns region's info that is adjacent with specific region -func (bc *BasicCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) { +func (bc *BasicCluster) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { return bc.Regions.GetAdjacentRegions(region) } @@ -99,12 +95,12 @@ func (bc *BasicCluster) ResetStoreOverload(storeID uint64) { } // RandFollowerRegion returns a random region that has a follower on the store. -func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { +func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo { return bc.Regions.RandFollowerRegion(storeID, opts...) } // RandLeaderRegion returns a random region that has leader on the store. -func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { +func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo { return bc.Regions.RandLeaderRegion(storeID, opts...) } @@ -114,16 +110,16 @@ func (bc *BasicCluster) GetAverageRegionSize() int64 { } // PutStore put a store -func (bc *BasicCluster) PutStore(store *core.StoreInfo) { +func (bc *BasicCluster) PutStore(store *StoreInfo) { bc.Stores.SetStore(store) } // DeleteStore deletes a store -func (bc *BasicCluster) DeleteStore(store *core.StoreInfo) { +func (bc *BasicCluster) DeleteStore(store *StoreInfo) { bc.Stores.DeleteStore(store) } // PutRegion put a region -func (bc *BasicCluster) PutRegion(region *core.RegionInfo) { +func (bc *BasicCluster) PutRegion(region *RegionInfo) { bc.Regions.SetRegion(region) } diff --git a/server/core/test_util.go b/server/core/test_util.go index 01767352835..a4a475e5b72 100644 --- a/server/core/test_util.go +++ b/server/core/test_util.go @@ -15,7 +15,6 @@ package core import ( "math" - "sync/atomic" "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" @@ -79,18 +78,3 @@ func NewRegion(start, end []byte) *metapb.Region { RegionEpoch: &metapb.RegionEpoch{}, } } - -// MockIDAllocator mocks IDAllocator and it is only used for test. -type MockIDAllocator struct { - base uint64 -} - -// NewMockIDAllocator create a new MockIDAllocator -func NewMockIDAllocator() *MockIDAllocator { - return &MockIDAllocator{base: 0} -} - -// Alloc return a new id -func (alloc *MockIDAllocator) Alloc() (uint64, error) { - return atomic.AddUint64(&alloc.base, 1), nil -} diff --git a/server/mock/classifier.go b/server/mock/classifier.go new file mode 100644 index 00000000000..b9a9cba1b60 --- /dev/null +++ b/server/mock/classifier.go @@ -0,0 +1,67 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import "github.com/pingcap/pd/server/core" + +// Classifier is used for test purpose. +type Classifier struct{} + +// GetAllNamespaces mocks method. +func (c Classifier) GetAllNamespaces() []string { + return []string{"global", "unknown"} +} + +// GetStoreNamespace mocks method. +func (c Classifier) GetStoreNamespace(store *core.StoreInfo) string { + if store.GetID() < 5 { + return "global" + } + return "unknown" +} + +// GetRegionNamespace mocks method. +func (c Classifier) GetRegionNamespace(*core.RegionInfo) string { + return "global" +} + +// IsNamespaceExist mocks method. +func (c Classifier) IsNamespaceExist(name string) bool { + return true +} + +// AllowMerge mocks method. +func (c Classifier) AllowMerge(*core.RegionInfo, *core.RegionInfo) bool { + return true +} + +// ReloadNamespaces mocks method. +func (c Classifier) ReloadNamespaces() error { + return nil +} + +// IsMetaExist mocks method. +func (c Classifier) IsMetaExist() bool { + return false +} + +// IsTableIDExist mocks method. +func (c Classifier) IsTableIDExist(tableID int64) bool { + return false +} + +// IsStoreIDExist mocks method. +func (c Classifier) IsStoreIDExist(storeID uint64) bool { + return false +} diff --git a/server/mock/cluster.go b/server/mock/cluster.go new file mode 100644 index 00000000000..255506e205e --- /dev/null +++ b/server/mock/cluster.go @@ -0,0 +1,459 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "fmt" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/namespace" + "github.com/pingcap/pd/server/statistics" + "go.uber.org/zap" +) + +// Cluster is used to mock clusterInfo for test use. +type Cluster struct { + *core.BasicCluster + *IDAllocator + *ScheduleOptions + *statistics.HotSpotCache + ID uint64 +} + +// NewCluster creates a new Cluster +func NewCluster(opt *ScheduleOptions) *Cluster { + return &Cluster{ + BasicCluster: core.NewBasicCluster(), + IDAllocator: NewIDAllocator(), + ScheduleOptions: opt, + HotSpotCache: statistics.NewHotSpotCache(), + } +} + +func (mc *Cluster) allocID() (uint64, error) { + return mc.Alloc() +} + +// ScanRegions scans region with start key, until number greater than limit. +func (mc *Cluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo { + return mc.Regions.ScanRange(startKey, limit) +} + +// LoadRegion puts region info without leader +func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) { + // regions load from etcd will have no leader + r := mc.newMockRegionInfo(regionID, 0, followerIds...).Clone(core.WithLeader(nil)) + mc.PutRegion(r) +} + +// GetStoreRegionCount gets region count with a given store. +func (mc *Cluster) GetStoreRegionCount(storeID uint64) int { + return mc.Regions.GetStoreRegionCount(storeID) +} + +// IsRegionHot checks if the region is hot. +func (mc *Cluster) IsRegionHot(id uint64) bool { + return mc.HotSpotCache.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold()) +} + +// RegionReadStats returns hot region's read stats. +func (mc *Cluster) RegionReadStats() []*core.RegionStat { + return mc.HotSpotCache.RegionStats(statistics.ReadFlow) +} + +// RegionWriteStats returns hot region's write stats. +func (mc *Cluster) RegionWriteStats() []*core.RegionStat { + return mc.HotSpotCache.RegionStats(statistics.WriteFlow) +} + +// RandHotRegionFromStore random picks a hot region in specify store. +func (mc *Cluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo { + r := mc.HotSpotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) + if r == nil { + return nil + } + return mc.GetRegion(r.RegionID) +} + +// AllocPeer allocs a new peer on a store. +func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + peerID, err := mc.allocID() + if err != nil { + log.Error("failed to alloc peer", zap.Error(err)) + return nil, err + } + peer := &metapb.Peer{ + Id: peerID, + StoreId: storeID, + } + return peer, nil +} + +// SetStoreUp sets store state to be up. +func (mc *Cluster) SetStoreUp(storeID uint64) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(time.Now()), + ) + mc.PutStore(newStore) +} + +// SetStoreDisconnect changes a store's state to disconnected. +func (mc *Cluster) SetStoreDisconnect(storeID uint64) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(time.Now().Add(-time.Second*30)), + ) + mc.PutStore(newStore) +} + +// SetStoreDown sets store down. +func (mc *Cluster) SetStoreDown(storeID uint64) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(time.Time{}), + ) + mc.PutStore(newStore) +} + +// SetStoreOffline sets store state to be offline. +func (mc *Cluster) SetStoreOffline(storeID uint64) { + store := mc.GetStore(storeID) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline)) + mc.PutStore(newStore) +} + +// SetStoreBusy sets store busy. +func (mc *Cluster) SetStoreBusy(storeID uint64, busy bool) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.IsBusy = busy + newStore := store.Clone( + core.SetStoreStats(newStats), + core.SetLastHeartbeatTS(time.Now()), + ) + mc.PutStore(newStore) +} + +// AddLeaderStore adds store with specified count of leader. +func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int) { + stats := &pdpb.StoreStats{} + stats.Capacity = 1000 * (1 << 20) + stats.Available = stats.Capacity - uint64(leaderCount)*10 + store := core.NewStoreInfo( + &metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + mc.PutStore(store) +} + +// AddRegionStore adds store with specified count of region. +func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) { + stats := &pdpb.StoreStats{} + stats.Capacity = 1000 * (1 << 20) + stats.Available = stats.Capacity - uint64(regionCount)*10 + store := core.NewStoreInfo( + &metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + mc.PutStore(store) +} + +// AddLabelsStore adds store with specified count of region and labels. +func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[string]string) { + var newLabels []*metapb.StoreLabel + for k, v := range labels { + newLabels = append(newLabels, &metapb.StoreLabel{Key: k, Value: v}) + } + stats := &pdpb.StoreStats{} + stats.Capacity = 1000 * (1 << 20) + stats.Available = stats.Capacity - uint64(regionCount)*10 + store := core.NewStoreInfo( + &metapb.Store{ + Id: storeID, + Labels: newLabels, + }, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + mc.PutStore(store) +} + +// AddLeaderRegion adds region with specified leader and followers. +func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) { + origin := mc.newMockRegionInfo(regionID, leaderID, followerIds...) + region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10)) + mc.PutRegion(region) +} + +// AddLeaderRegionWithRange adds region with specified leader, followers and key range. +func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) { + o := mc.newMockRegionInfo(regionID, leaderID, followerIds...) + r := o.Clone( + core.WithStartKey([]byte(startKey)), + core.WithEndKey([]byte(endKey)), + ) + mc.PutRegion(r) +} + +// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info. +func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { + r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) + r = r.Clone(core.SetReadBytes(readBytes)) + isUpdate, item := mc.HotSpotCache.CheckRead(r, mc.Stores) + if isUpdate { + mc.HotSpotCache.Update(regionID, item, statistics.ReadFlow) + } + mc.PutRegion(r) +} + +// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info. +func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { + r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) + r = r.Clone(core.SetWrittenBytes(writtenBytes)) + isUpdate, item := mc.HotSpotCache.CheckWrite(r, mc.Stores) + if isUpdate { + mc.HotSpotCache.Update(regionID, item, statistics.WriteFlow) + } + mc.PutRegion(r) +} + +// UpdateStoreLeaderWeight updates store leader weight. +func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { + store := mc.GetStore(storeID) + newStore := store.Clone(core.SetLeaderWeight(weight)) + mc.PutStore(newStore) +} + +// UpdateStoreRegionWeight updates store region weight. +func (mc *Cluster) UpdateStoreRegionWeight(storeID uint64, weight float64) { + store := mc.GetStore(storeID) + newStore := store.Clone(core.SetRegionWeight(weight)) + mc.PutStore(newStore) +} + +// UpdateStoreLeaderSize updates store leader size. +func (mc *Cluster) UpdateStoreLeaderSize(storeID uint64, size int64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.Available = newStats.Capacity - uint64(store.GetLeaderSize()) + newStore := store.Clone( + core.SetStoreStats(newStats), + core.SetLeaderSize(size), + ) + mc.PutStore(newStore) +} + +// UpdateStoreRegionSize updates store region size. +func (mc *Cluster) UpdateStoreRegionSize(storeID uint64, size int64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.Available = newStats.Capacity - uint64(store.GetRegionSize()) + newStore := store.Clone( + core.SetStoreStats(newStats), + core.SetRegionSize(size), + ) + mc.PutStore(newStore) +} + +// UpdateLeaderCount updates store leader count. +func (mc *Cluster) UpdateLeaderCount(storeID uint64, leaderCount int) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + mc.PutStore(newStore) +} + +// UpdateRegionCount updates store region count. +func (mc *Cluster) UpdateRegionCount(storeID uint64, regionCount int) { + store := mc.GetStore(storeID) + newStore := store.Clone( + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionCount)*10), + ) + mc.PutStore(newStore) +} + +// UpdateSnapshotCount updates store snapshot count. +func (mc *Cluster) UpdateSnapshotCount(storeID uint64, snapshotCount int) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.ApplyingSnapCount = uint32(snapshotCount) + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.PutStore(newStore) +} + +// UpdatePendingPeerCount updates store pending peer count. +func (mc *Cluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int) { + store := mc.GetStore(storeID) + newStore := store.Clone(core.SetPendingPeerCount(pendingPeerCount)) + mc.PutStore(newStore) +} + +// UpdateStorageRatio updates store storage ratio count. +func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.Capacity = 1000 * (1 << 20) + newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio) + newStats.Available = uint64(float64(newStats.Capacity) * availableRatio) + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.PutStore(newStore) +} + +// UpdateStorageWrittenBytes updates store written bytes. +func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.BytesWritten = bytesWritten + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.PutStore(newStore) +} + +// UpdateStorageReadBytes updates store read bytes. +func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.BytesRead = bytesRead + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.PutStore(newStore) +} + +// UpdateStoreStatus updates store status. +func (mc *Cluster) UpdateStoreStatus(id uint64) { + leaderCount := mc.Regions.GetStoreLeaderCount(id) + regionCount := mc.Regions.GetStoreRegionCount(id) + pendingPeerCount := mc.Regions.GetStorePendingPeerCount(id) + leaderSize := mc.Regions.GetStoreLeaderRegionSize(id) + regionSize := mc.Regions.GetStoreRegionSize(id) + store := mc.Stores.GetStore(id) + stats := &pdpb.StoreStats{} + stats.Capacity = 1000 * (1 << 20) + stats.Available = stats.Capacity - uint64(store.GetRegionSize()) + stats.UsedSize = uint64(store.GetRegionSize()) + newStore := store.Clone( + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetRegionCount(regionCount), + core.SetPendingPeerCount(pendingPeerCount), + core.SetLeaderSize(leaderSize), + core.SetRegionSize(regionSize), + ) + mc.PutStore(newStore) +} + +func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo { + region := &metapb.Region{ + Id: regionID, + StartKey: []byte(fmt.Sprintf("%20d", regionID)), + EndKey: []byte(fmt.Sprintf("%20d", regionID+1)), + } + leader, _ := mc.AllocPeer(leaderID) + region.Peers = []*metapb.Peer{leader} + for _, id := range followerIds { + peer, _ := mc.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + + return core.NewRegionInfo(region, leader) +} + +// GetOpt mocks method. +func (mc *Cluster) GetOpt() namespace.ScheduleOptions { + return mc.ScheduleOptions +} + +// GetLeaderScheduleLimit mocks method. +func (mc *Cluster) GetLeaderScheduleLimit() uint64 { + return mc.ScheduleOptions.GetLeaderScheduleLimit(namespace.DefaultNamespace) +} + +// GetRegionScheduleLimit mocks method. +func (mc *Cluster) GetRegionScheduleLimit() uint64 { + return mc.ScheduleOptions.GetRegionScheduleLimit(namespace.DefaultNamespace) +} + +// GetReplicaScheduleLimit mocks method. +func (mc *Cluster) GetReplicaScheduleLimit() uint64 { + return mc.ScheduleOptions.GetReplicaScheduleLimit(namespace.DefaultNamespace) +} + +// GetMergeScheduleLimit mocks method. +func (mc *Cluster) GetMergeScheduleLimit() uint64 { + return mc.ScheduleOptions.GetMergeScheduleLimit(namespace.DefaultNamespace) +} + +// GetHotRegionScheduleLimit mocks method. +func (mc *Cluster) GetHotRegionScheduleLimit() uint64 { + return mc.ScheduleOptions.GetHotRegionScheduleLimit(namespace.DefaultNamespace) +} + +// GetMaxReplicas mocks method. +func (mc *Cluster) GetMaxReplicas() int { + return mc.ScheduleOptions.GetMaxReplicas(namespace.DefaultNamespace) +} + +// CheckLabelProperty checks label property. +func (mc *Cluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { + for _, cfg := range mc.LabelProperties[typ] { + for _, l := range labels { + if l.Key == cfg.Key && l.Value == cfg.Value { + return true + } + } + } + return false +} + +// PutRegionStores mocks method. +func (mc *Cluster) PutRegionStores(id uint64, stores ...uint64) { + meta := &metapb.Region{Id: id} + for _, s := range stores { + meta.Peers = append(meta.Peers, &metapb.Peer{StoreId: s}) + } + mc.PutRegion(core.NewRegionInfo(meta, &metapb.Peer{StoreId: stores[0]})) +} + +// PutStoreWithLabels mocks method. +func (mc *Cluster) PutStoreWithLabels(id uint64, labelPairs ...string) { + var labels []*metapb.StoreLabel + for i := 0; i < len(labelPairs); i += 2 { + labels = append(labels, &metapb.StoreLabel{Key: labelPairs[i], Value: labelPairs[i+1]}) + } + mc.PutStore(core.NewStoreInfo(&metapb.Store{Id: id, Labels: labels})) +} diff --git a/server/mock/heartbeat_stream.go b/server/mock/heartbeat_stream.go new file mode 100644 index 00000000000..21ee560e032 --- /dev/null +++ b/server/mock/heartbeat_stream.go @@ -0,0 +1,97 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "context" + "errors" + "time" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" +) + +// HeartbeatStream is used to mock HeartbeatStream for test use. +type HeartbeatStream struct { + ch chan *pdpb.RegionHeartbeatResponse +} + +// NewHeartbeatStream creates a new HeartbeatStream. +func NewHeartbeatStream() HeartbeatStream { + return HeartbeatStream{ + ch: make(chan *pdpb.RegionHeartbeatResponse), + } +} + +// Send mocks method. +func (s HeartbeatStream) Send(m *pdpb.RegionHeartbeatResponse) error { + select { + case <-time.After(time.Second): + return errors.New("timeout") + case s.ch <- m: + } + return nil +} + +// SendMsg is used to send the message. +func (s HeartbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { + return +} + +// Recv mocks method. +func (s HeartbeatStream) Recv() *pdpb.RegionHeartbeatResponse { + select { + case <-time.After(time.Millisecond * 10): + return nil + case res := <-s.ch: + return res + } +} + +// HeartbeatStreams is used to mock heartbeatstreams for test use. +type HeartbeatStreams struct { + ctx context.Context + cancel context.CancelFunc + clusterID uint64 + msgCh chan *pdpb.RegionHeartbeatResponse +} + +// NewHeartbeatStreams creates a new HeartbeatStreams. +func NewHeartbeatStreams(clusterID uint64) *HeartbeatStreams { + ctx, cancel := context.WithCancel(context.Background()) + hs := &HeartbeatStreams{ + ctx: ctx, + cancel: cancel, + clusterID: clusterID, + msgCh: make(chan *pdpb.RegionHeartbeatResponse, 1024), + } + return hs +} + +// SendMsg is used to send the message. +func (mhs *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { + if region.GetLeader() == nil { + return + } + + msg.Header = &pdpb.ResponseHeader{ClusterId: mhs.clusterID} + msg.RegionId = region.GetID() + msg.RegionEpoch = region.GetRegionEpoch() + msg.TargetPeer = region.GetLeader() + + select { + case mhs.msgCh <- msg: + case <-mhs.ctx.Done(): + } +} diff --git a/server/mock/id.go b/server/mock/id.go new file mode 100644 index 00000000000..04d5a5efaaa --- /dev/null +++ b/server/mock/id.go @@ -0,0 +1,31 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import "sync/atomic" + +// IDAllocator mocks IDAllocator and it is only used for test. +type IDAllocator struct { + base uint64 +} + +// NewIDAllocator create a new IDAllocator +func NewIDAllocator() *IDAllocator { + return &IDAllocator{base: 0} +} + +// Alloc return a new id +func (alloc *IDAllocator) Alloc() (uint64, error) { + return atomic.AddUint64(&alloc.base, 1), nil +} diff --git a/server/mock/schedule.go b/server/mock/schedule.go new file mode 100644 index 00000000000..cc8d1871507 --- /dev/null +++ b/server/mock/schedule.go @@ -0,0 +1,232 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "time" + + "github.com/pingcap/kvproto/pkg/metapb" +) + +const ( + defaultMaxReplicas = 3 + defaultMaxSnapshotCount = 3 + defaultMaxPendingPeerCount = 16 + defaultMaxMergeRegionSize = 0 + defaultMaxMergeRegionKeys = 0 + defaultSplitMergeInterval = 0 + defaultMaxStoreDownTime = 30 * time.Minute + defaultLeaderScheduleLimit = 4 + defaultRegionScheduleLimit = 4 + defaultReplicaScheduleLimit = 8 + defaultMergeScheduleLimit = 8 + defaultHotRegionScheduleLimit = 2 + defaultStoreBalanceRate = 1 + defaultTolerantSizeRatio = 2.5 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.6 + defaultHotRegionCacheHitsThreshold = 3 + defaultStrictlyMatchLabel = true +) + +// ScheduleOptions is a mock of ScheduleOptions +// which implements Options interface +type ScheduleOptions struct { + RegionScheduleLimit uint64 + LeaderScheduleLimit uint64 + ReplicaScheduleLimit uint64 + MergeScheduleLimit uint64 + HotRegionScheduleLimit uint64 + StoreBalanceRate float64 + MaxSnapshotCount uint64 + MaxPendingPeerCount uint64 + MaxMergeRegionSize uint64 + MaxMergeRegionKeys uint64 + SplitMergeInterval time.Duration + MaxStoreDownTime time.Duration + MaxReplicas int + LocationLabels []string + StrictlyMatchLabel bool + HotRegionCacheHitsThreshold int + TolerantSizeRatio float64 + LowSpaceRatio float64 + HighSpaceRatio float64 + DisableLearner bool + DisableRemoveDownReplica bool + DisableReplaceOfflineReplica bool + DisableMakeUpReplica bool + DisableRemoveExtraReplica bool + DisableLocationReplacement bool + DisableNamespaceRelocation bool + LabelProperties map[string][]*metapb.StoreLabel +} + +// NewScheduleOptions creates a mock schedule option. +func NewScheduleOptions() *ScheduleOptions { + mso := &ScheduleOptions{} + mso.RegionScheduleLimit = defaultRegionScheduleLimit + mso.LeaderScheduleLimit = defaultLeaderScheduleLimit + mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit + mso.MergeScheduleLimit = defaultMergeScheduleLimit + mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit + mso.StoreBalanceRate = defaultStoreBalanceRate + mso.MaxSnapshotCount = defaultMaxSnapshotCount + mso.MaxMergeRegionSize = defaultMaxMergeRegionSize + mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys + mso.SplitMergeInterval = defaultSplitMergeInterval + mso.MaxStoreDownTime = defaultMaxStoreDownTime + mso.MaxReplicas = defaultMaxReplicas + mso.StrictlyMatchLabel = defaultStrictlyMatchLabel + mso.HotRegionCacheHitsThreshold = defaultHotRegionCacheHitsThreshold + mso.MaxPendingPeerCount = defaultMaxPendingPeerCount + mso.TolerantSizeRatio = defaultTolerantSizeRatio + mso.LowSpaceRatio = defaultLowSpaceRatio + mso.HighSpaceRatio = defaultHighSpaceRatio + return mso +} + +// GetLeaderScheduleLimit mocks method +func (mso *ScheduleOptions) GetLeaderScheduleLimit(name string) uint64 { + return mso.LeaderScheduleLimit +} + +// GetRegionScheduleLimit mocks method +func (mso *ScheduleOptions) GetRegionScheduleLimit(name string) uint64 { + return mso.RegionScheduleLimit +} + +// GetReplicaScheduleLimit mocks method +func (mso *ScheduleOptions) GetReplicaScheduleLimit(name string) uint64 { + return mso.ReplicaScheduleLimit +} + +// GetMergeScheduleLimit mocks method +func (mso *ScheduleOptions) GetMergeScheduleLimit(name string) uint64 { + return mso.MergeScheduleLimit +} + +// GetHotRegionScheduleLimit mocks method +func (mso *ScheduleOptions) GetHotRegionScheduleLimit(name string) uint64 { + return mso.HotRegionScheduleLimit +} + +// GetStoreBalanceRate mocks method +func (mso *ScheduleOptions) GetStoreBalanceRate() float64 { + return mso.StoreBalanceRate +} + +// GetMaxSnapshotCount mocks method +func (mso *ScheduleOptions) GetMaxSnapshotCount() uint64 { + return mso.MaxSnapshotCount +} + +// GetMaxPendingPeerCount mocks method +func (mso *ScheduleOptions) GetMaxPendingPeerCount() uint64 { + return mso.MaxPendingPeerCount +} + +// GetMaxMergeRegionSize mocks method +func (mso *ScheduleOptions) GetMaxMergeRegionSize() uint64 { + return mso.MaxMergeRegionSize +} + +// GetMaxMergeRegionKeys mocks method +func (mso *ScheduleOptions) GetMaxMergeRegionKeys() uint64 { + return mso.MaxMergeRegionKeys +} + +// GetSplitMergeInterval mocks method +func (mso *ScheduleOptions) GetSplitMergeInterval() time.Duration { + return mso.SplitMergeInterval +} + +// GetMaxStoreDownTime mocks method +func (mso *ScheduleOptions) GetMaxStoreDownTime() time.Duration { + return mso.MaxStoreDownTime +} + +// GetMaxReplicas mocks method +func (mso *ScheduleOptions) GetMaxReplicas(name string) int { + return mso.MaxReplicas +} + +// GetLocationLabels mocks method +func (mso *ScheduleOptions) GetLocationLabels() []string { + return mso.LocationLabels +} + +// GetStrictlyMatchLabel mocks method +func (mso *ScheduleOptions) GetStrictlyMatchLabel() bool { + return mso.StrictlyMatchLabel +} + +// GetHotRegionCacheHitsThreshold mocks method +func (mso *ScheduleOptions) GetHotRegionCacheHitsThreshold() int { + return mso.HotRegionCacheHitsThreshold +} + +// GetTolerantSizeRatio mocks method +func (mso *ScheduleOptions) GetTolerantSizeRatio() float64 { + return mso.TolerantSizeRatio +} + +// GetLowSpaceRatio mocks method +func (mso *ScheduleOptions) GetLowSpaceRatio() float64 { + return mso.LowSpaceRatio +} + +// GetHighSpaceRatio mocks method +func (mso *ScheduleOptions) GetHighSpaceRatio() float64 { + return mso.HighSpaceRatio +} + +// SetMaxReplicas mocks method +func (mso *ScheduleOptions) SetMaxReplicas(replicas int) { + mso.MaxReplicas = replicas +} + +// IsRaftLearnerEnabled mocks method +func (mso *ScheduleOptions) IsRaftLearnerEnabled() bool { + return !mso.DisableLearner +} + +// IsRemoveDownReplicaEnabled mocks method. +func (mso *ScheduleOptions) IsRemoveDownReplicaEnabled() bool { + return !mso.DisableRemoveDownReplica +} + +// IsReplaceOfflineReplicaEnabled mocks method. +func (mso *ScheduleOptions) IsReplaceOfflineReplicaEnabled() bool { + return !mso.DisableReplaceOfflineReplica +} + +// IsMakeUpReplicaEnabled mocks method. +func (mso *ScheduleOptions) IsMakeUpReplicaEnabled() bool { + return !mso.DisableMakeUpReplica +} + +// IsRemoveExtraReplicaEnabled mocks method. +func (mso *ScheduleOptions) IsRemoveExtraReplicaEnabled() bool { + return !mso.DisableRemoveExtraReplica +} + +// IsLocationReplacementEnabled mocks method. +func (mso *ScheduleOptions) IsLocationReplacementEnabled() bool { + return !mso.DisableLocationReplacement +} + +// IsNamespaceRelocationEnabled mocks method. +func (mso *ScheduleOptions) IsNamespaceRelocationEnabled() bool { + return !mso.DisableNamespaceRelocation +} diff --git a/server/namespace/classifier.go b/server/namespace/namespace.go similarity index 91% rename from server/namespace/classifier.go rename to server/namespace/namespace.go index 2c063c40cf7..07f654e6a1e 100644 --- a/server/namespace/classifier.go +++ b/server/namespace/namespace.go @@ -24,6 +24,15 @@ import ( // default. const DefaultNamespace = "global" +// ScheduleOptions for namespace cluster. +type ScheduleOptions interface { + GetLeaderScheduleLimit(name string) uint64 + GetRegionScheduleLimit(name string) uint64 + GetReplicaScheduleLimit(name string) uint64 + GetMergeScheduleLimit(name string) uint64 + GetMaxReplicas(name string) int +} + // DefaultClassifier is a classifier that classifies all regions and stores to // DefaultNamespace. var DefaultClassifier = defaultClassifier{} diff --git a/server/placement/constraint.go b/server/placement/constraint.go index 6aabc468dbe..cf7f089acab 100644 --- a/server/placement/constraint.go +++ b/server/placement/constraint.go @@ -44,5 +44,5 @@ type Cluster interface { GetRegion(id uint64) *core.RegionInfo GetStores() []*core.StoreInfo GetStore(id uint64) *core.StoreInfo - GetRegionStores(id uint64) []*core.StoreInfo + GetRegionStores(region *core.RegionInfo) []*core.StoreInfo } diff --git a/server/placement/functions.go b/server/placement/functions.go index 7fbf5212709..17b419b4322 100644 --- a/server/placement/functions.go +++ b/server/placement/functions.go @@ -53,12 +53,12 @@ func (c Constraint) eval(region *core.RegionInfo, cluster Cluster) int { } func (c Constraint) evalCount(region *core.RegionInfo, cluster Cluster) int { - stores := c.filterStores(cluster.GetRegionStores(region.GetID())) + stores := c.filterStores(cluster.GetRegionStores(region)) return len(stores) } func (c Constraint) evalLabelValues(region *core.RegionInfo, cluster Cluster) int { - stores := c.filterStores(cluster.GetRegionStores(region.GetID())) + stores := c.filterStores(cluster.GetRegionStores(region)) return c.countLabelValues(stores, c.Labels) } @@ -71,7 +71,7 @@ func (c Constraint) evalCountLeader(region *core.RegionInfo, cluster Cluster) in } func (c Constraint) evalIsolationLevel(region *core.RegionInfo, cluster Cluster) int { - stores := c.filterStores(cluster.GetRegionStores(region.GetID())) + stores := c.filterStores(cluster.GetRegionStores(region)) for i := range c.Labels { if c.countLabelValues(stores, c.Labels[:i+1]) == len(stores) { return len(c.Labels) - i diff --git a/server/placement/placement_test.go b/server/placement/placement_test.go index a56f0c3f002..95ed73dbc69 100644 --- a/server/placement/placement_test.go +++ b/server/placement/placement_test.go @@ -16,10 +16,8 @@ package placement import ( "testing" - "github.com/pingcap/kvproto/pkg/metapb" - . "github.com/pingcap/check" - "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) func TestPlacement(t *testing.T) { @@ -98,11 +96,12 @@ func (s *testPlacementSuite) config(constraints ...*Constraint) *Config { } func (s *testPlacementSuite) TestFunctions(c *C) { - cluster := newMockCluster() - cluster.PutStore(1, "zone", "z1", "host", "h1", "disk", "ssd") - cluster.PutStore(2, "zone", "z1", "host", "h1", "disk", "ssd") - cluster.PutStore(3, "zone", "z1", "host", "h2", "disk", "hdd") - cluster.PutStore(4, "zone", "z2", "host", "h1", "disk", "ssd") + opt := mock.NewScheduleOptions() + cluster := mock.NewCluster(opt) + cluster.PutStoreWithLabels(1, "zone", "z1", "host", "h1", "disk", "ssd") + cluster.PutStoreWithLabels(2, "zone", "z1", "host", "h1", "disk", "ssd") + cluster.PutStoreWithLabels(3, "zone", "z1", "host", "h2", "disk", "hdd") + cluster.PutStoreWithLabels(4, "zone", "z2", "host", "h1", "disk", "ssd") cases := []struct { config string @@ -141,17 +140,18 @@ func (s *testPlacementSuite) TestFunctions(c *C) { for _, t := range cases { constraint, err := parseConstraint(t.config) c.Assert(err, IsNil) - cluster.PutRegion(1, t.regionStores...) + cluster.PutRegionStores(1, t.regionStores...) c.Assert(constraint.Score(cluster.GetRegion(1), cluster), Equals, 0) } } func (s *testPlacementSuite) TestScore(c *C) { - cluster := newMockCluster() - cluster.PutStore(1) - cluster.PutStore(2) - cluster.PutStore(3) - cluster.PutRegion(1, 1, 2, 3) + opt := mock.NewScheduleOptions() + cluster := mock.NewCluster(opt) + cluster.PutStoreWithLabels(1) + cluster.PutStoreWithLabels(2) + cluster.PutStoreWithLabels(3) + cluster.PutRegionStores(1, 1, 2, 3) cases := []struct { config string @@ -194,62 +194,3 @@ func (s *testPlacementSuite) TestScore(c *C) { c.Assert(constraint.Score(cluster.GetRegion(1), cluster), Equals, t.score) } } - -type mockCluster struct { - regions map[uint64]*core.RegionInfo - stores map[uint64]*core.StoreInfo -} - -func newMockCluster() *mockCluster { - return &mockCluster{ - regions: make(map[uint64]*core.RegionInfo), - stores: make(map[uint64]*core.StoreInfo), - } -} - -func (c *mockCluster) GetRegion(id uint64) *core.RegionInfo { - return c.regions[id] -} - -func (c *mockCluster) GetStores() []*core.StoreInfo { - stores := make([]*core.StoreInfo, 0, len(c.stores)) - for _, s := range c.stores { - stores = append(stores, s) - } - return stores -} - -func (c *mockCluster) GetStore(id uint64) *core.StoreInfo { - return c.stores[id] -} - -func (c *mockCluster) GetRegionStores(id uint64) []*core.StoreInfo { - region := c.GetRegion(id) - if region == nil { - return nil - } - stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) - for _, p := range region.GetPeers() { - store := c.GetStore(p.GetStoreId()) - if store != nil { - stores = append(stores, store) - } - } - return stores -} - -func (c *mockCluster) PutRegion(id uint64, stores ...uint64) { - meta := &metapb.Region{Id: id} - for _, s := range stores { - meta.Peers = append(meta.Peers, &metapb.Peer{StoreId: s}) - } - c.regions[id] = core.NewRegionInfo(meta, &metapb.Peer{StoreId: stores[0]}) -} - -func (c *mockCluster) PutStore(id uint64, labelPairs ...string) { - var labels []*metapb.StoreLabel - for i := 0; i < len(labelPairs); i += 2 { - labels = append(labels, &metapb.StoreLabel{Key: labelPairs[i], Value: labelPairs[i+1]}) - } - c.stores[id] = core.NewStoreInfo(&metapb.Store{Id: id, Labels: labels}) -} diff --git a/server/region_statistics_test.go b/server/region_statistics_test.go index 68a3e2fd356..4e99b3e2741 100644 --- a/server/region_statistics_test.go +++ b/server/region_statistics_test.go @@ -18,49 +18,9 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) -type mockClassifier struct{} - -func (c mockClassifier) GetAllNamespaces() []string { - return []string{"global", "unknown"} -} - -func (c mockClassifier) GetStoreNamespace(store *core.StoreInfo) string { - if store.GetID() < 5 { - return "global" - } - return "unknown" -} - -func (c mockClassifier) GetRegionNamespace(*core.RegionInfo) string { - return "global" -} - -func (c mockClassifier) IsNamespaceExist(name string) bool { - return true -} - -func (c mockClassifier) AllowMerge(*core.RegionInfo, *core.RegionInfo) bool { - return true -} - -func (c mockClassifier) ReloadNamespaces() error { - return nil -} - -func (c mockClassifier) IsMetaExist() bool { - return false -} - -func (c mockClassifier) IsTableIDExist(tableID int64) bool { - return false -} - -func (c mockClassifier) IsStoreIDExist(storeID uint64) bool { - return false -} - var _ = Suite(&testRegionStatisticsSuite{}) type testRegionStatisticsSuite struct{} @@ -98,7 +58,7 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")} region1 := core.NewRegionInfo(r1, peers[0]) region2 := core.NewRegionInfo(r2, peers[0]) - regionStats := newRegionStatistics(opt, mockClassifier{}) + regionStats := newRegionStatistics(opt, mock.Classifier{}) regionStats.Observe(region1, stores) c.Assert(len(regionStats.stats[extraPeer]), Equals, 1) c.Assert(len(regionStats.stats[learnerPeer]), Equals, 1) diff --git a/server/schedule/filters_test.go b/server/schedule/filters_test.go index fa47a4baab5..20461746153 100644 --- a/server/schedule/filters_test.go +++ b/server/schedule/filters_test.go @@ -16,6 +16,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) var _ = Suite(&testFiltersSuite{}) @@ -24,8 +25,8 @@ type testFiltersSuite struct{} func (s *testReplicationSuite) TestPendingPeerFilter(c *C) { filter := NewPendingPeerCountFilter() - opt := NewMockSchedulerOptions() - tc := NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) store := core.NewStoreInfo(&metapb.Store{Id: 1}) c.Assert(filter.FilterSource(tc, store), IsFalse) newStore := store.Clone(core.SetPendingPeerCount(30)) diff --git a/server/schedule/merge_checker_test.go b/server/schedule/merge_checker_test.go index 4cf3439e1d8..82b3f75c4b8 100644 --- a/server/schedule/merge_checker_test.go +++ b/server/schedule/merge_checker_test.go @@ -19,22 +19,23 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "github.com/pingcap/pd/server/namespace" ) var _ = Suite(&testMergeCheckerSuite{}) type testMergeCheckerSuite struct { - cluster *MockCluster + cluster *mock.Cluster mc *MergeChecker regions []*core.RegionInfo } func (s *testMergeCheckerSuite) SetUpTest(c *C) { - cfg := NewMockSchedulerOptions() + cfg := mock.NewScheduleOptions() cfg.MaxMergeRegionSize = 2 cfg.MaxMergeRegionKeys = 2 - s.cluster = NewMockCluster(cfg) + s.cluster = mock.NewCluster(cfg) s.regions = []*core.RegionInfo{ core.NewRegionInfo( &metapb.Region{ @@ -103,7 +104,7 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) { } func (s *testMergeCheckerSuite) TestBasic(c *C) { - s.cluster.MockSchedulerOptions.SplitMergeInterval = time.Hour + s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour // should with same peer count ops := s.mc.Check(s.regions[0]) diff --git a/server/schedule/mockcluster.go b/server/schedule/mockcluster.go deleted file mode 100644 index ff0d94f4a8e..00000000000 --- a/server/schedule/mockcluster.go +++ /dev/null @@ -1,783 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package schedule - -import ( - "context" - "fmt" - "time" - - "github.com/gogo/protobuf/proto" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" - log "github.com/pingcap/log" - "github.com/pingcap/pd/server/core" - "github.com/pingcap/pd/server/namespace" - "github.com/pingcap/pd/server/statistics" - "go.uber.org/zap" -) - -// MockHeadbeatStream is used to mock HeadbeatStream for test use. -type MockHeadbeatStream struct{} - -// SendMsg is used to send the message. -func (m MockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { - return -} - -// MockCluster is used to mock clusterInfo for test use. -type MockCluster struct { - *BasicCluster - *core.MockIDAllocator - *MockSchedulerOptions - *statistics.HotSpotCache - ID uint64 -} - -// NewMockCluster creates a new MockCluster -func NewMockCluster(opt *MockSchedulerOptions) *MockCluster { - return &MockCluster{ - BasicCluster: NewBasicCluster(), - MockIDAllocator: core.NewMockIDAllocator(), - MockSchedulerOptions: opt, - HotSpotCache: statistics.NewHotSpotCache(), - } -} - -func (mc *MockCluster) allocID() (uint64, error) { - return mc.Alloc() -} - -// ScanRegions scans region with start key, until number greater than limit. -func (mc *MockCluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo { - return mc.Regions.ScanRange(startKey, limit) -} - -// LoadRegion puts region info without leader -func (mc *MockCluster) LoadRegion(regionID uint64, followerIds ...uint64) { - // regions load from etcd will have no leader - r := mc.newMockRegionInfo(regionID, 0, followerIds...).Clone(core.WithLeader(nil)) - mc.PutRegion(r) -} - -// GetStoreRegionCount gets region count with a given store. -func (mc *MockCluster) GetStoreRegionCount(storeID uint64) int { - return mc.Regions.GetStoreRegionCount(storeID) -} - -// IsRegionHot checks if the region is hot. -func (mc *MockCluster) IsRegionHot(id uint64) bool { - return mc.HotSpotCache.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold()) -} - -// RegionReadStats returns hot region's read stats. -func (mc *MockCluster) RegionReadStats() []*core.RegionStat { - return mc.HotSpotCache.RegionStats(statistics.ReadFlow) -} - -// RegionWriteStats returns hot region's write stats. -func (mc *MockCluster) RegionWriteStats() []*core.RegionStat { - return mc.HotSpotCache.RegionStats(statistics.WriteFlow) -} - -// RandHotRegionFromStore random picks a hot region in specify store. -func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo { - r := mc.HotSpotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) - if r == nil { - return nil - } - return mc.GetRegion(r.RegionID) -} - -// AllocPeer allocs a new peer on a store. -func (mc *MockCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { - peerID, err := mc.allocID() - if err != nil { - log.Error("failed to alloc peer", zap.Error(err)) - return nil, err - } - peer := &metapb.Peer{ - Id: peerID, - StoreId: storeID, - } - return peer, nil -} - -// SetStoreUp sets store state to be up. -func (mc *MockCluster) SetStoreUp(storeID uint64) { - store := mc.GetStore(storeID) - newStore := store.Clone( - core.SetStoreState(metapb.StoreState_Up), - core.SetLastHeartbeatTS(time.Now()), - ) - mc.PutStore(newStore) -} - -// SetStoreDisconnect changes a store's state to disconnected. -func (mc *MockCluster) SetStoreDisconnect(storeID uint64) { - store := mc.GetStore(storeID) - newStore := store.Clone( - core.SetStoreState(metapb.StoreState_Up), - core.SetLastHeartbeatTS(time.Now().Add(-time.Second*30)), - ) - mc.PutStore(newStore) -} - -// SetStoreDown sets store down. -func (mc *MockCluster) SetStoreDown(storeID uint64) { - store := mc.GetStore(storeID) - newStore := store.Clone( - core.SetStoreState(metapb.StoreState_Up), - core.SetLastHeartbeatTS(time.Time{}), - ) - mc.PutStore(newStore) -} - -// SetStoreOffline sets store state to be offline. -func (mc *MockCluster) SetStoreOffline(storeID uint64) { - store := mc.GetStore(storeID) - newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline)) - mc.PutStore(newStore) -} - -// SetStoreBusy sets store busy. -func (mc *MockCluster) SetStoreBusy(storeID uint64, busy bool) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.IsBusy = busy - newStore := store.Clone( - core.SetStoreStats(newStats), - core.SetLastHeartbeatTS(time.Now()), - ) - mc.PutStore(newStore) -} - -// AddLeaderStore adds store with specified count of leader. -func (mc *MockCluster) AddLeaderStore(storeID uint64, leaderCount int) { - stats := &pdpb.StoreStats{} - stats.Capacity = 1000 * (1 << 20) - stats.Available = stats.Capacity - uint64(leaderCount)*10 - store := core.NewStoreInfo( - &metapb.Store{Id: storeID}, - core.SetStoreStats(stats), - core.SetLeaderCount(leaderCount), - core.SetLeaderSize(int64(leaderCount)*10), - core.SetLastHeartbeatTS(time.Now()), - ) - mc.PutStore(store) -} - -// AddRegionStore adds store with specified count of region. -func (mc *MockCluster) AddRegionStore(storeID uint64, regionCount int) { - stats := &pdpb.StoreStats{} - stats.Capacity = 1000 * (1 << 20) - stats.Available = stats.Capacity - uint64(regionCount)*10 - store := core.NewStoreInfo( - &metapb.Store{Id: storeID}, - core.SetStoreStats(stats), - core.SetRegionCount(regionCount), - core.SetRegionSize(int64(regionCount)*10), - core.SetLastHeartbeatTS(time.Now()), - ) - mc.PutStore(store) -} - -// AddLabelsStore adds store with specified count of region and labels. -func (mc *MockCluster) AddLabelsStore(storeID uint64, regionCount int, labels map[string]string) { - var newLabels []*metapb.StoreLabel - for k, v := range labels { - newLabels = append(newLabels, &metapb.StoreLabel{Key: k, Value: v}) - } - stats := &pdpb.StoreStats{} - stats.Capacity = 1000 * (1 << 20) - stats.Available = stats.Capacity - uint64(regionCount)*10 - store := core.NewStoreInfo( - &metapb.Store{ - Id: storeID, - Labels: newLabels, - }, - core.SetStoreStats(stats), - core.SetRegionCount(regionCount), - core.SetRegionSize(int64(regionCount)*10), - core.SetLastHeartbeatTS(time.Now()), - ) - mc.PutStore(store) -} - -// AddLeaderRegion adds region with specified leader and followers. -func (mc *MockCluster) AddLeaderRegion(regionID uint64, leaderID uint64, followerIds ...uint64) { - origin := mc.newMockRegionInfo(regionID, leaderID, followerIds...) - region := origin.Clone(core.SetApproximateSize(10), core.SetApproximateKeys(10)) - mc.PutRegion(region) -} - -// AddLeaderRegionWithRange adds region with specified leader, followers and key range. -func (mc *MockCluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) { - o := mc.newMockRegionInfo(regionID, leaderID, followerIds...) - r := o.Clone( - core.WithStartKey([]byte(startKey)), - core.WithEndKey([]byte(endKey)), - ) - mc.PutRegion(r) -} - -// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info. -func (mc *MockCluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, followerIds ...uint64) { - r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) - r = r.Clone(core.SetReadBytes(readBytes)) - isUpdate, item := mc.HotSpotCache.CheckRead(r, mc.Stores) - if isUpdate { - mc.HotSpotCache.Update(regionID, item, statistics.ReadFlow) - } - mc.PutRegion(r) -} - -// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info. -func (mc *MockCluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, followerIds ...uint64) { - r := mc.newMockRegionInfo(regionID, leaderID, followerIds...) - r = r.Clone(core.SetWrittenBytes(writtenBytes)) - isUpdate, item := mc.HotSpotCache.CheckWrite(r, mc.Stores) - if isUpdate { - mc.HotSpotCache.Update(regionID, item, statistics.WriteFlow) - } - mc.PutRegion(r) -} - -// UpdateStoreLeaderWeight updates store leader weight. -func (mc *MockCluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { - store := mc.GetStore(storeID) - newStore := store.Clone(core.SetLeaderWeight(weight)) - mc.PutStore(newStore) -} - -// UpdateStoreRegionWeight updates store region weight. -func (mc *MockCluster) UpdateStoreRegionWeight(storeID uint64, weight float64) { - store := mc.GetStore(storeID) - newStore := store.Clone(core.SetRegionWeight(weight)) - mc.PutStore(newStore) -} - -// UpdateStoreLeaderSize updates store leader size. -func (mc *MockCluster) UpdateStoreLeaderSize(storeID uint64, size int64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.Available = newStats.Capacity - uint64(store.GetLeaderSize()) - newStore := store.Clone( - core.SetStoreStats(newStats), - core.SetLeaderSize(size), - ) - mc.PutStore(newStore) -} - -// UpdateStoreRegionSize updates store region size. -func (mc *MockCluster) UpdateStoreRegionSize(storeID uint64, size int64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.Available = newStats.Capacity - uint64(store.GetRegionSize()) - newStore := store.Clone( - core.SetStoreStats(newStats), - core.SetRegionSize(size), - ) - mc.PutStore(newStore) -} - -// UpdateLeaderCount updates store leader count. -func (mc *MockCluster) UpdateLeaderCount(storeID uint64, leaderCount int) { - store := mc.GetStore(storeID) - newStore := store.Clone( - core.SetLeaderCount(leaderCount), - core.SetLeaderSize(int64(leaderCount)*10), - ) - mc.PutStore(newStore) -} - -// UpdateRegionCount updates store region count. -func (mc *MockCluster) UpdateRegionCount(storeID uint64, regionCount int) { - store := mc.GetStore(storeID) - newStore := store.Clone( - core.SetRegionCount(regionCount), - core.SetRegionSize(int64(regionCount)*10), - ) - mc.PutStore(newStore) -} - -// UpdateSnapshotCount updates store snapshot count. -func (mc *MockCluster) UpdateSnapshotCount(storeID uint64, snapshotCount int) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.ApplyingSnapCount = uint32(snapshotCount) - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) -} - -// UpdatePendingPeerCount updates store pending peer count. -func (mc *MockCluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int) { - store := mc.GetStore(storeID) - newStore := store.Clone(core.SetPendingPeerCount(pendingPeerCount)) - mc.PutStore(newStore) -} - -// UpdateStorageRatio updates store storage ratio count. -func (mc *MockCluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.Capacity = 1000 * (1 << 20) - newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio) - newStats.Available = uint64(float64(newStats.Capacity) * availableRatio) - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) -} - -// UpdateStorageWrittenBytes updates store written bytes. -func (mc *MockCluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesWritten = bytesWritten - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) -} - -// UpdateStorageReadBytes updates store read bytes. -func (mc *MockCluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { - store := mc.GetStore(storeID) - newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) - newStats.BytesRead = bytesRead - now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} - newStats.Interval = interval - newStore := store.Clone(core.SetStoreStats(newStats)) - mc.PutStore(newStore) -} - -// UpdateStoreStatus updates store status. -func (mc *MockCluster) UpdateStoreStatus(id uint64) { - leaderCount := mc.Regions.GetStoreLeaderCount(id) - regionCount := mc.Regions.GetStoreRegionCount(id) - pendingPeerCount := mc.Regions.GetStorePendingPeerCount(id) - leaderSize := mc.Regions.GetStoreLeaderRegionSize(id) - regionSize := mc.Regions.GetStoreRegionSize(id) - store := mc.Stores.GetStore(id) - stats := &pdpb.StoreStats{} - stats.Capacity = 1000 * (1 << 20) - stats.Available = stats.Capacity - uint64(store.GetRegionSize()) - stats.UsedSize = uint64(store.GetRegionSize()) - newStore := store.Clone( - core.SetStoreStats(stats), - core.SetLeaderCount(leaderCount), - core.SetRegionCount(regionCount), - core.SetPendingPeerCount(pendingPeerCount), - core.SetLeaderSize(leaderSize), - core.SetRegionSize(regionSize), - ) - mc.PutStore(newStore) -} - -func (mc *MockCluster) newMockRegionInfo(regionID uint64, leaderID uint64, followerIds ...uint64) *core.RegionInfo { - region := &metapb.Region{ - Id: regionID, - StartKey: []byte(fmt.Sprintf("%20d", regionID)), - EndKey: []byte(fmt.Sprintf("%20d", regionID+1)), - } - leader, _ := mc.AllocPeer(leaderID) - region.Peers = []*metapb.Peer{leader} - for _, id := range followerIds { - peer, _ := mc.AllocPeer(id) - region.Peers = append(region.Peers, peer) - } - - return core.NewRegionInfo(region, leader) -} - -// ApplyOperatorStep mocks apply operator step. -func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator) *core.RegionInfo { - if step := op.Check(region); step != nil { - switch s := step.(type) { - case TransferLeader: - region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore))) - case AddPeer: - if region.GetStorePeer(s.ToStore) != nil { - panic("Add peer that exists") - } - peer := &metapb.Peer{ - Id: s.PeerID, - StoreId: s.ToStore, - } - region = region.Clone(core.WithAddPeer(peer)) - case AddLightPeer: - if region.GetStorePeer(s.ToStore) != nil { - panic("Add peer that exists") - } - peer := &metapb.Peer{ - Id: s.PeerID, - StoreId: s.ToStore, - } - region = region.Clone(core.WithAddPeer(peer)) - case RemovePeer: - if region.GetStorePeer(s.FromStore) == nil { - panic("Remove peer that doesn't exist") - } - if region.GetLeader().GetStoreId() == s.FromStore { - panic("Cannot remove the leader peer") - } - region = region.Clone(core.WithRemoveStorePeer(s.FromStore)) - case AddLearner: - if region.GetStorePeer(s.ToStore) != nil { - panic("Add learner that exists") - } - peer := &metapb.Peer{ - Id: s.PeerID, - StoreId: s.ToStore, - IsLearner: true, - } - region = region.Clone(core.WithAddPeer(peer)) - case AddLightLearner: - if region.GetStorePeer(s.ToStore) != nil { - panic("Add learner that exists") - } - peer := &metapb.Peer{ - Id: s.PeerID, - StoreId: s.ToStore, - IsLearner: true, - } - region = region.Clone(core.WithAddPeer(peer)) - case PromoteLearner: - if region.GetStoreLearner(s.ToStore) == nil { - panic("Promote peer that doesn't exist") - } - peer := &metapb.Peer{ - Id: s.PeerID, - StoreId: s.ToStore, - } - region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer)) - default: - panic("Unknown operator step") - } - } - return region -} - -// ApplyOperator mocks apply operator. -func (mc *MockCluster) ApplyOperator(op *Operator) { - origin := mc.GetRegion(op.RegionID()) - region := origin - for !op.IsFinish() { - region = mc.ApplyOperatorStep(region, op) - } - mc.PutRegion(region) - for id := range region.GetStoreIds() { - mc.UpdateStoreStatus(id) - } - for id := range origin.GetStoreIds() { - mc.UpdateStoreStatus(id) - } -} - -// GetOpt mocks method. -func (mc *MockCluster) GetOpt() NamespaceOptions { - return mc.MockSchedulerOptions -} - -// GetLeaderScheduleLimit mocks method. -func (mc *MockCluster) GetLeaderScheduleLimit() uint64 { - return mc.MockSchedulerOptions.GetLeaderScheduleLimit(namespace.DefaultNamespace) -} - -// GetRegionScheduleLimit mocks method. -func (mc *MockCluster) GetRegionScheduleLimit() uint64 { - return mc.MockSchedulerOptions.GetRegionScheduleLimit(namespace.DefaultNamespace) -} - -// GetReplicaScheduleLimit mocks method. -func (mc *MockCluster) GetReplicaScheduleLimit() uint64 { - return mc.MockSchedulerOptions.GetReplicaScheduleLimit(namespace.DefaultNamespace) -} - -// GetMergeScheduleLimit mocks method. -func (mc *MockCluster) GetMergeScheduleLimit() uint64 { - return mc.MockSchedulerOptions.GetMergeScheduleLimit(namespace.DefaultNamespace) -} - -// GetHotRegionScheduleLimit mocks method. -func (mc *MockCluster) GetHotRegionScheduleLimit() uint64 { - return mc.MockSchedulerOptions.GetHotRegionScheduleLimit(namespace.DefaultNamespace) -} - -// GetMaxReplicas mocks method. -func (mc *MockCluster) GetMaxReplicas() int { - return mc.MockSchedulerOptions.GetMaxReplicas(namespace.DefaultNamespace) -} - -// CheckLabelProperty checks label property. -func (mc *MockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { - for _, cfg := range mc.LabelProperties[typ] { - for _, l := range labels { - if l.Key == cfg.Key && l.Value == cfg.Value { - return true - } - } - } - return false -} - -const ( - defaultMaxReplicas = 3 - defaultMaxSnapshotCount = 3 - defaultMaxPendingPeerCount = 16 - defaultMaxMergeRegionSize = 0 - defaultMaxMergeRegionKeys = 0 - defaultSplitMergeInterval = 0 - defaultMaxStoreDownTime = 30 * time.Minute - defaultLeaderScheduleLimit = 4 - defaultRegionScheduleLimit = 4 - defaultReplicaScheduleLimit = 8 - defaultMergeScheduleLimit = 8 - defaultHotRegionScheduleLimit = 2 - defaultStoreBalanceRate = 1 - defaultTolerantSizeRatio = 2.5 - defaultLowSpaceRatio = 0.8 - defaultHighSpaceRatio = 0.6 - defaultHotRegionCacheHitsThreshold = 3 - defaultStrictlyMatchLabel = true -) - -// MockSchedulerOptions is a mock of SchedulerOptions -// which implements Options interface -type MockSchedulerOptions struct { - RegionScheduleLimit uint64 - LeaderScheduleLimit uint64 - ReplicaScheduleLimit uint64 - MergeScheduleLimit uint64 - HotRegionScheduleLimit uint64 - StoreBalanceRate float64 - MaxSnapshotCount uint64 - MaxPendingPeerCount uint64 - MaxMergeRegionSize uint64 - MaxMergeRegionKeys uint64 - SplitMergeInterval time.Duration - MaxStoreDownTime time.Duration - MaxReplicas int - LocationLabels []string - StrictlyMatchLabel bool - HotRegionCacheHitsThreshold int - TolerantSizeRatio float64 - LowSpaceRatio float64 - HighSpaceRatio float64 - DisableLearner bool - DisableRemoveDownReplica bool - DisableReplaceOfflineReplica bool - DisableMakeUpReplica bool - DisableRemoveExtraReplica bool - DisableLocationReplacement bool - DisableNamespaceRelocation bool - LabelProperties map[string][]*metapb.StoreLabel -} - -// NewMockSchedulerOptions creates a mock schedule option. -func NewMockSchedulerOptions() *MockSchedulerOptions { - mso := &MockSchedulerOptions{} - mso.RegionScheduleLimit = defaultRegionScheduleLimit - mso.LeaderScheduleLimit = defaultLeaderScheduleLimit - mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit - mso.MergeScheduleLimit = defaultMergeScheduleLimit - mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit - mso.StoreBalanceRate = defaultStoreBalanceRate - mso.MaxSnapshotCount = defaultMaxSnapshotCount - mso.MaxMergeRegionSize = defaultMaxMergeRegionSize - mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys - mso.SplitMergeInterval = defaultSplitMergeInterval - mso.MaxStoreDownTime = defaultMaxStoreDownTime - mso.MaxReplicas = defaultMaxReplicas - mso.StrictlyMatchLabel = defaultStrictlyMatchLabel - mso.HotRegionCacheHitsThreshold = defaultHotRegionCacheHitsThreshold - mso.MaxPendingPeerCount = defaultMaxPendingPeerCount - mso.TolerantSizeRatio = defaultTolerantSizeRatio - mso.LowSpaceRatio = defaultLowSpaceRatio - mso.HighSpaceRatio = defaultHighSpaceRatio - return mso -} - -// GetLeaderScheduleLimit mock method -func (mso *MockSchedulerOptions) GetLeaderScheduleLimit(name string) uint64 { - return mso.LeaderScheduleLimit -} - -// GetRegionScheduleLimit mock method -func (mso *MockSchedulerOptions) GetRegionScheduleLimit(name string) uint64 { - return mso.RegionScheduleLimit -} - -// GetReplicaScheduleLimit mock method -func (mso *MockSchedulerOptions) GetReplicaScheduleLimit(name string) uint64 { - return mso.ReplicaScheduleLimit -} - -// GetMergeScheduleLimit mock method -func (mso *MockSchedulerOptions) GetMergeScheduleLimit(name string) uint64 { - return mso.MergeScheduleLimit -} - -// GetHotRegionScheduleLimit mock method -func (mso *MockSchedulerOptions) GetHotRegionScheduleLimit(name string) uint64 { - return mso.HotRegionScheduleLimit -} - -// GetStoreBalanceRate mock method -func (mso *MockSchedulerOptions) GetStoreBalanceRate() float64 { - return mso.StoreBalanceRate -} - -// GetMaxSnapshotCount mock method -func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 { - return mso.MaxSnapshotCount -} - -// GetMaxPendingPeerCount mock method -func (mso *MockSchedulerOptions) GetMaxPendingPeerCount() uint64 { - return mso.MaxPendingPeerCount -} - -// GetMaxMergeRegionSize mock method -func (mso *MockSchedulerOptions) GetMaxMergeRegionSize() uint64 { - return mso.MaxMergeRegionSize -} - -// GetMaxMergeRegionKeys mock method -func (mso *MockSchedulerOptions) GetMaxMergeRegionKeys() uint64 { - return mso.MaxMergeRegionKeys -} - -// GetSplitMergeInterval mock method -func (mso *MockSchedulerOptions) GetSplitMergeInterval() time.Duration { - return mso.SplitMergeInterval -} - -// GetMaxStoreDownTime mock method -func (mso *MockSchedulerOptions) GetMaxStoreDownTime() time.Duration { - return mso.MaxStoreDownTime -} - -// GetMaxReplicas mock method -func (mso *MockSchedulerOptions) GetMaxReplicas(name string) int { - return mso.MaxReplicas -} - -// GetLocationLabels mock method -func (mso *MockSchedulerOptions) GetLocationLabels() []string { - return mso.LocationLabels -} - -// GetStrictlyMatchLabel mock method -func (mso *MockSchedulerOptions) GetStrictlyMatchLabel() bool { - return mso.StrictlyMatchLabel -} - -// GetHotRegionCacheHitsThreshold mock method -func (mso *MockSchedulerOptions) GetHotRegionCacheHitsThreshold() int { - return mso.HotRegionCacheHitsThreshold -} - -// GetTolerantSizeRatio mock method -func (mso *MockSchedulerOptions) GetTolerantSizeRatio() float64 { - return mso.TolerantSizeRatio -} - -// GetLowSpaceRatio mock method -func (mso *MockSchedulerOptions) GetLowSpaceRatio() float64 { - return mso.LowSpaceRatio -} - -// GetHighSpaceRatio mock method -func (mso *MockSchedulerOptions) GetHighSpaceRatio() float64 { - return mso.HighSpaceRatio -} - -// SetMaxReplicas mock method -func (mso *MockSchedulerOptions) SetMaxReplicas(replicas int) { - mso.MaxReplicas = replicas -} - -// IsRaftLearnerEnabled mock method -func (mso *MockSchedulerOptions) IsRaftLearnerEnabled() bool { - return !mso.DisableLearner -} - -// IsRemoveDownReplicaEnabled mock method. -func (mso *MockSchedulerOptions) IsRemoveDownReplicaEnabled() bool { - return !mso.DisableRemoveDownReplica -} - -// IsReplaceOfflineReplicaEnabled mock method. -func (mso *MockSchedulerOptions) IsReplaceOfflineReplicaEnabled() bool { - return !mso.DisableReplaceOfflineReplica -} - -// IsMakeUpReplicaEnabled mock method. -func (mso *MockSchedulerOptions) IsMakeUpReplicaEnabled() bool { - return !mso.DisableMakeUpReplica -} - -// IsRemoveExtraReplicaEnabled mock method. -func (mso *MockSchedulerOptions) IsRemoveExtraReplicaEnabled() bool { - return !mso.DisableRemoveExtraReplica -} - -// IsLocationReplacementEnabled mock method. -func (mso *MockSchedulerOptions) IsLocationReplacementEnabled() bool { - return !mso.DisableLocationReplacement -} - -// IsNamespaceRelocationEnabled mock method. -func (mso *MockSchedulerOptions) IsNamespaceRelocationEnabled() bool { - return !mso.DisableNamespaceRelocation -} - -// MockHeartbeatStreams is used to mock heartbeatstreams for test use. -type MockHeartbeatStreams struct { - ctx context.Context - cancel context.CancelFunc - clusterID uint64 - msgCh chan *pdpb.RegionHeartbeatResponse -} - -// NewMockHeartbeatStreams creates a new MockHeartbeatStreams. -func NewMockHeartbeatStreams(clusterID uint64) *MockHeartbeatStreams { - ctx, cancel := context.WithCancel(context.Background()) - hs := &MockHeartbeatStreams{ - ctx: ctx, - cancel: cancel, - clusterID: clusterID, - msgCh: make(chan *pdpb.RegionHeartbeatResponse, 1024), - } - return hs -} - -// SendMsg is used to send the message. -func (mhs *MockHeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { - if region.GetLeader() == nil { - return - } - - msg.Header = &pdpb.ResponseHeader{ClusterId: mhs.clusterID} - msg.RegionId = region.GetID() - msg.RegionEpoch = region.GetRegionEpoch() - msg.TargetPeer = region.GetLeader() - - select { - case mhs.msgCh <- msg: - case <-mhs.ctx.Done(): - } -} diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index e6fa52211f0..bb695bd2308 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -20,6 +20,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/mock" ) var _ = Suite(&testOperatorControllerSuite{}) @@ -28,8 +29,8 @@ type testOperatorControllerSuite struct{} // issue #1338 func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { - opt := NewMockSchedulerOptions() - tc := NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) oc := NewOperatorController(tc, nil) tc.AddLeaderStore(2, 1) tc.AddLeaderRegion(1, 1, 2) @@ -56,9 +57,9 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { } func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { - opt := NewMockSchedulerOptions() - tc := NewMockCluster(opt) - oc := NewOperatorController(tc, MockHeadbeatStream{}) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) + oc := NewOperatorController(tc, mock.NewHeartbeatStream()) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) @@ -76,21 +77,21 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_RUNNING) c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING) op1.createTime = time.Now().Add(-10 * time.Minute) - region2 = tc.ApplyOperatorStep(region2, op2) + region2 = ApplyOperatorStep(region2, op2) tc.PutRegion(region2) oc.Dispatch(region1, "test") oc.Dispatch(region2, "test") c.Assert(oc.GetOperatorStatus(1).Status, Equals, pdpb.OperatorStatus_TIMEOUT) c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_RUNNING) - tc.ApplyOperator(op2) + ApplyOperator(tc, op2) oc.Dispatch(region2, "test") c.Assert(oc.GetOperatorStatus(2).Status, Equals, pdpb.OperatorStatus_SUCCESS) } func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { - opt := NewMockSchedulerOptions() - tc := NewMockCluster(opt) - oc := NewOperatorController(tc, MockHeadbeatStream{}) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) + oc := NewOperatorController(tc, mock.NewHeartbeatStream()) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) diff --git a/server/schedule/opts.go b/server/schedule/opts.go index 7151ad2d917..9ea402c94ca 100644 --- a/server/schedule/opts.go +++ b/server/schedule/opts.go @@ -58,15 +58,6 @@ type Options interface { CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool } -// NamespaceOptions for namespace cluster. -type NamespaceOptions interface { - GetLeaderScheduleLimit(name string) uint64 - GetRegionScheduleLimit(name string) uint64 - GetReplicaScheduleLimit(name string) uint64 - GetMergeScheduleLimit(name string) uint64 - GetMaxReplicas(name string) int -} - const ( // RejectLeader is the label property type that suggests a store should not // have any region leaders. diff --git a/server/schedule/replica_test.go b/server/schedule/replica_test.go index 1976a93292f..d7156b5f288 100644 --- a/server/schedule/replica_test.go +++ b/server/schedule/replica_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) func TestSchedule(t *testing.T) { @@ -29,12 +30,12 @@ func TestSchedule(t *testing.T) { var _ = Suite(&testReplicationSuite{}) type testReplicationSuite struct { - tc *MockCluster + tc *mock.Cluster } func (s *testReplicationSuite) SetUpSuite(c *C) { - opt := NewMockSchedulerOptions() - s.tc = NewMockCluster(opt) + opt := mock.NewScheduleOptions() + s.tc = mock.NewCluster(opt) } func (s *testReplicationSuite) TestDistinctScore(c *C) { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 2101a67c03c..e1e11fce972 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/statistics" "github.com/pkg/errors" "go.uber.org/zap" @@ -52,7 +53,7 @@ type Cluster interface { RandHotRegionFromStore(store uint64, kind statistics.FlowKind) *core.RegionInfo // get config methods - GetOpt() NamespaceOptions + GetOpt() namespace.ScheduleOptions Options // TODO: it should be removed. Schedulers don't need to know anything diff --git a/server/schedule/test_util.go b/server/schedule/test_util.go new file mode 100644 index 00000000000..0c996b70081 --- /dev/null +++ b/server/schedule/test_util.go @@ -0,0 +1,104 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" +) + +// ApplyOperatorStep applies operator step. Only for test purpose. +func ApplyOperatorStep(region *core.RegionInfo, op *Operator) *core.RegionInfo { + if step := op.Check(region); step != nil { + switch s := step.(type) { + case TransferLeader: + region = region.Clone(core.WithLeader(region.GetStorePeer(s.ToStore))) + case AddPeer: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add peer that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + } + region = region.Clone(core.WithAddPeer(peer)) + case AddLightPeer: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add peer that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + } + region = region.Clone(core.WithAddPeer(peer)) + case RemovePeer: + if region.GetStorePeer(s.FromStore) == nil { + panic("Remove peer that doesn't exist") + } + if region.GetLeader().GetStoreId() == s.FromStore { + panic("Cannot remove the leader peer") + } + region = region.Clone(core.WithRemoveStorePeer(s.FromStore)) + case AddLearner: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add learner that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + IsLearner: true, + } + region = region.Clone(core.WithAddPeer(peer)) + case AddLightLearner: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add learner that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + IsLearner: true, + } + region = region.Clone(core.WithAddPeer(peer)) + case PromoteLearner: + if region.GetStoreLearner(s.ToStore) == nil { + panic("Promote peer that doesn't exist") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + } + region = region.Clone(core.WithRemoveStorePeer(s.ToStore), core.WithAddPeer(peer)) + default: + panic("Unknown operator step") + } + } + return region +} + +// ApplyOperator applies operator. Only for test purpose. +func ApplyOperator(mc *mock.Cluster, op *Operator) { + origin := mc.GetRegion(op.RegionID()) + region := origin + for !op.IsFinish() { + region = ApplyOperatorStep(region, op) + } + mc.PutRegion(region) + for id := range region.GetStoreIds() { + mc.UpdateStoreStatus(id) + } + for id := range origin.GetStoreIds() { + mc.UpdateStoreStatus(id) + } +} diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index d0ab40627ab..1a3ac62e82a 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -23,12 +23,13 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/statistics" ) -func newTestReplication(mso *schedule.MockSchedulerOptions, maxReplicas int, locationLabels ...string) { +func newTestReplication(mso *mock.ScheduleOptions, maxReplicas int, locationLabels ...string) { mso.MaxReplicas = maxReplicas mso.LocationLabels = locationLabels } @@ -73,8 +74,8 @@ func (s *testBalanceSpeedSuite) TestShouldBalance(c *C) { {80, 70, 50, false}, } - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) // create a region to control average region size. tc.AddLeaderRegion(1, 1, 2) @@ -100,8 +101,8 @@ func (s *testBalanceSpeedSuite) TestShouldBalance(c *C) { } func (s *testBalanceSpeedSuite) TestBalanceLimit(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) tc.AddLeaderStore(1, 10) tc.AddLeaderStore(2, 20) tc.AddLeaderStore(3, 30) @@ -117,14 +118,14 @@ func (s *testBalanceSpeedSuite) TestBalanceLimit(c *C) { var _ = Suite(&testBalanceLeaderSchedulerSuite{}) type testBalanceLeaderSchedulerSuite struct { - tc *schedule.MockCluster + tc *mock.Cluster lb schedule.Scheduler oc *schedule.OperatorController } func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) { - opt := schedule.NewMockSchedulerOptions() - s.tc = schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + s.tc = mock.NewCluster(opt) s.oc = schedule.NewOperatorController(nil, nil) lb, err := schedule.CreateScheduler("balance-leader", s.oc) c.Assert(err, IsNil) @@ -319,8 +320,8 @@ var _ = Suite(&testBalanceRegionSchedulerSuite{}) type testBalanceRegionSchedulerSuite struct{} func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) oc := schedule.NewOperatorController(nil, nil) sb, err := schedule.CreateScheduler("balance-region", oc) @@ -354,8 +355,8 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { } func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) oc := schedule.NewOperatorController(nil, nil) newTestReplication(opt, 3, "zone", "rack", "host") @@ -423,8 +424,8 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { } func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) oc := schedule.NewOperatorController(nil, nil) newTestReplication(opt, 5, "zone", "rack", "host") @@ -461,8 +462,8 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { } func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) oc := schedule.NewOperatorController(nil, nil) sb, err := schedule.CreateScheduler("balance-region", oc) @@ -490,8 +491,8 @@ var _ = Suite(&testReplicaCheckerSuite{}) type testReplicaCheckerSuite struct{} func (s *testReplicaCheckerSuite) TestBasic(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) @@ -564,8 +565,8 @@ func (s *testReplicaCheckerSuite) TestBasic(c *C) { } func (s *testReplicaCheckerSuite) TestLostStore(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) @@ -582,8 +583,8 @@ func (s *testReplicaCheckerSuite) TestLostStore(c *C) { } func (s *testReplicaCheckerSuite) TestOffline(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) newTestReplication(opt, 3, "zone", "rack", "host") @@ -639,8 +640,8 @@ func (s *testReplicaCheckerSuite) TestOffline(c *C) { } func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) newTestReplication(opt, 3, "zone", "rack", "host") @@ -717,8 +718,8 @@ func (s *testReplicaCheckerSuite) TestDistinctScore(c *C) { } func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) newTestReplication(opt, 5, "zone", "host") @@ -746,9 +747,9 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) { } func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) { - opt := schedule.NewMockSchedulerOptions() + opt := mock.NewScheduleOptions() opt.LocationLabels = []string{"zone"} - tc := schedule.NewMockCluster(opt) + tc := mock.NewCluster(opt) rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) @@ -781,8 +782,8 @@ func (s *testReplicaCheckerSuite) TestStorageThreshold(c *C) { } func (s *testReplicaCheckerSuite) TestOpts(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) rc := schedule.NewReplicaChecker(tc, namespace.DefaultClassifier) tc.AddRegionStore(1, 100) @@ -814,10 +815,10 @@ var _ = Suite(&testRandomMergeSchedulerSuite{}) type testRandomMergeSchedulerSuite struct{} func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) { - opt := schedule.NewMockSchedulerOptions() + opt := mock.NewScheduleOptions() opt.MergeScheduleLimit = 1 - tc := schedule.NewMockCluster(opt) - hb := schedule.NewMockHeartbeatStreams(tc.ID) + tc := mock.NewCluster(opt) + hb := mock.NewHeartbeatStreams(tc.ID) oc := schedule.NewOperatorController(tc, hb) mb, err := schedule.CreateScheduler("random-merge", oc) @@ -844,9 +845,9 @@ var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{}) type testBalanceHotWriteRegionSchedulerSuite struct{} func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() + opt := mock.NewScheduleOptions() newTestReplication(opt, 3, "zone", "host") - tc := schedule.NewMockCluster(opt) + tc := mock.NewCluster(opt) hb, err := schedule.CreateScheduler("hot-write-region", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) @@ -903,7 +904,7 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { opt.HotRegionScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 0) // hot region scheduler is not affect by `balance-region-schedule-limit`. - opt.HotRegionScheduleLimit = schedule.NewMockSchedulerOptions().HotRegionScheduleLimit + opt.HotRegionScheduleLimit = mock.NewScheduleOptions().HotRegionScheduleLimit opt.RegionScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 1) // Always produce operator @@ -938,7 +939,7 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { // hot region scheduler is restricted by schedule limit. opt.LeaderScheduleLimit = 0 c.Assert(hb.Schedule(tc), HasLen, 0) - opt.LeaderScheduleLimit = schedule.NewMockSchedulerOptions().LeaderScheduleLimit + opt.LeaderScheduleLimit = mock.NewScheduleOptions().LeaderScheduleLimit // Should not panic if region not found. for i := uint64(1); i <= 3; i++ { @@ -950,8 +951,8 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { type testBalanceHotReadRegionSchedulerSuite struct{} func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) @@ -1025,8 +1026,8 @@ var _ = Suite(&testScatterRangeLeaderSuite{}) type testScatterRangeLeaderSuite struct{} func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) // Add stores 1,2,3,4,5. tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 0) @@ -1084,7 +1085,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { limit++ continue } - tc.ApplyOperator(ops[0]) + schedule.ApplyOperator(tc, ops[0]) } for i := 1; i <= 5; i++ { leaderCount := tc.Regions.GetStoreLeaderCount(uint64(i)) @@ -1095,8 +1096,8 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) { } func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) // Add stores 1,2,3. tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 0) @@ -1156,6 +1157,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) { limit++ continue } - tc.ApplyOperator(ops[0]) + schedule.ApplyOperator(tc, ops[0]) } } diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 8bdc3f697d7..e169dbca544 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/statistics" @@ -28,8 +29,8 @@ var _ = Suite(&testShuffleLeaderSuite{}) type testShuffleLeaderSuite struct{} func (s *testShuffleLeaderSuite) TestShuffle(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) sl, err := schedule.CreateScheduler("shuffle-leader", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) @@ -58,8 +59,8 @@ var _ = Suite(&testBalanceAdjacentRegionSuite{}) type testBalanceAdjacentRegionSuite struct{} func (s *testBalanceAdjacentRegionSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) sc, err := schedule.CreateScheduler("adjacent-region", schedule.NewOperatorController(nil, nil), "32", "2") c.Assert(err, IsNil) @@ -126,8 +127,8 @@ func (s *testBalanceAdjacentRegionSuite) TestBalance(c *C) { } func (s *testBalanceAdjacentRegionSuite) TestNoNeedToBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) sc, err := schedule.CreateScheduler("adjacent-region", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) @@ -180,8 +181,8 @@ func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) { } func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) // Add stores 1~6. for i := uint64(1); i <= numStores; i++ { @@ -202,7 +203,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { region := tc.GetRegion(i) if op, _ := scatterer.Scatter(region); op != nil { s.checkOperator(op, c) - tc.ApplyOperator(op) + schedule.ApplyOperator(tc, op) } } @@ -221,9 +222,9 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { } func (s *testScatterRegionSuite) TestStorelimit(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) - oc := schedule.NewOperatorController(tc, schedule.MockHeadbeatStream{}) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) + oc := schedule.NewOperatorController(tc, mock.NewHeartbeatStream()) // Add stores 1~6. for i := uint64(1); i <= 5; i++ { @@ -253,11 +254,11 @@ var _ = Suite(&testRejectLeaderSuite{}) type testRejectLeaderSuite struct{} func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { - opt := schedule.NewMockSchedulerOptions() + opt := mock.NewScheduleOptions() opt.LabelProperties = map[string][]*metapb.StoreLabel{ schedule.RejectLeader: {{Key: "noleader", Value: "true"}}, } - tc := schedule.NewMockCluster(opt) + tc := mock.NewCluster(opt) // Add 3 stores 1,2,3. tc.AddLabelsStore(1, 1, map[string]string{"noleader": "true"}) @@ -312,9 +313,9 @@ var _ = Suite(&testShuffleHotRegionSchedulerSuite{}) type testShuffleHotRegionSchedulerSuite struct{} func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) { - opt := schedule.NewMockSchedulerOptions() + opt := mock.NewScheduleOptions() newTestReplication(opt, 3, "zone", "host") - tc := schedule.NewMockCluster(opt) + tc := mock.NewCluster(opt) hb, err := schedule.CreateScheduler("shuffle-hot-region", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) @@ -363,8 +364,8 @@ var _ = Suite(&testEvictLeaderSuite{}) type testEvictLeaderSuite struct{} func (s *testEvictLeaderSuite) TestEvictLeader(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) // Add stores 1, 2, 3 tc.AddLeaderStore(1, 0) @@ -387,8 +388,8 @@ var _ = Suite(&testShuffleRegionSuite{}) type testShuffleRegionSuite struct{} func (s *testShuffleRegionSuite) TestShuffle(c *C) { - opt := schedule.NewMockSchedulerOptions() - tc := schedule.NewMockCluster(opt) + opt := mock.NewScheduleOptions() + tc := mock.NewCluster(opt) sl, err := schedule.CreateScheduler("shuffle-region", schedule.NewOperatorController(nil, nil)) c.Assert(err, IsNil) diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index 2f51643b030..4759325360d 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/mock" ) var _ = Suite(&testTableNamespaceSuite{}) @@ -40,7 +41,7 @@ type testTableNamespaceSuite struct { func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier { kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, core.NewMockIDAllocator()) + classifier, err := NewTableNamespaceClassifier(kv, mock.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) testNamespace1 := Namespace{ @@ -135,7 +136,7 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) { func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) { kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, core.NewMockIDAllocator()) + classifier, err := NewTableNamespaceClassifier(kv, mock.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) nsInfo := tableClassifier.nsInfo