From c7b1330ba47379c0cad1b80423ef8713c10770e0 Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 30 Aug 2018 12:05:20 +0800 Subject: [PATCH 1/2] server/coordinator: startup schedulers with considering the proportion of regions activated on each store --- server/cluster_info.go | 48 +++++++++++++++++++++++++++++++++++--- server/coordinator.go | 1 + server/coordinator_test.go | 19 +++++++++++---- server/core/region.go | 4 ---- 4 files changed, 61 insertions(+), 11 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index 8ed864cc7a9..d6771b40a43 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -35,10 +35,10 @@ type clusterInfo struct { id core.IDAllocator kv *core.KV meta *metapb.Cluster - activeRegions int opt *scheduleOption regionStats *regionStatistics labelLevelStats *labelLevelStatistics + prepareChecker *prepareChecker } func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { @@ -48,6 +48,7 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus opt: opt, kv: kv, labelLevelStats: newLabelLevelStatistics(), + prepareChecker: newPrepareChecker(), } } @@ -416,7 +417,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn func (c *clusterInfo) isPrepared() bool { c.RLock() defer c.RUnlock() - return float64(c.core.Regions.Length())*collectFactor <= float64(c.activeRegions) + return c.prepareChecker.check(c) } // handleStoreHeartbeat updates the store status. @@ -514,7 +515,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { c.Lock() defer c.Unlock() if isNew { - c.activeRegions++ + c.prepareChecker.collect(region) } if saveCache { @@ -705,3 +706,44 @@ func (c *clusterInfo) RegionWriteStats() []*core.RegionStat { // RegionStats is a thread-safe method return c.core.HotCache.RegionStats(schedule.WriteFlow) } + +type prepareChecker struct { + reactiveRegions map[uint64]int + start time.Time + sum int + isPrepared bool +} + +func newPrepareChecker() *prepareChecker { + return &prepareChecker{ + start: time.Now(), + reactiveRegions: make(map[uint64]int), + } +} + +func (checker *prepareChecker) check(c *clusterInfo) bool { + if checker.isPrepared || time.Since(checker.start) > collectTimeout { + return true + } + if float64(c.core.Regions.Length())*collectFactor > float64(checker.sum) { + return false + } + for _, store := range c.core.GetStores() { + if !store.IsUp() { + continue + } + storeID := store.GetId() + if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + return false + } + } + checker.isPrepared = true + return true +} + +func (checker *prepareChecker) collect(region *core.RegionInfo) { + for _, p := range region.GetPeers() { + checker.reactiveRegions[p.GetStoreId()]++ + } + checker.sum++ +} diff --git a/server/coordinator.go b/server/coordinator.go index 1cab4b4bd84..c17fc125016 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -34,6 +34,7 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second collectFactor = 0.8 + collectTimeout = 5 * time.Minute historyKeepTime = 5 * time.Minute maxScheduleRetries = 10 diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 7757b60eaff..28530915ec8 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -444,12 +444,19 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) + tc.addLeaderStore(1, 5) + tc.addLeaderStore(2, 2) + tc.addLeaderStore(3, 0) + tc.addLeaderStore(4, 0) tc.LoadRegion(1, 1, 2, 3) tc.LoadRegion(2, 1, 2, 3) tc.LoadRegion(3, 1, 2, 3) tc.LoadRegion(4, 1, 2, 3) tc.LoadRegion(5, 1, 2, 3) + tc.LoadRegion(6, 2, 1, 4) + tc.LoadRegion(7, 2, 1, 4) c.Assert(co.shouldRun(), IsFalse) + c.Assert(tc.core.Regions.GetStoreRegionCount(4), Equals, 2) tbl := []struct { regionID uint64 @@ -458,8 +465,11 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { {1, false}, {2, false}, {3, false}, - {4, true}, - {5, true}, + {4, false}, + {5, false}, + // store4 needs collect two region + {6, false}, + {7, true}, } for _, t := range tbl { @@ -471,7 +481,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil) tc.handleRegionHeartbeat(newRegion) - c.Assert(co.cluster.activeRegions, Equals, 6) + c.Assert(co.cluster.prepareChecker.sum, Equals, 7) } @@ -629,8 +639,9 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { tc.addRegionStore(2, 2) tc.addRegionStore(3, 3) tc.addLeaderRegion(1, 1) - tc.activeRegions = 1 + //tc.activeRegions = 1 region := tc.GetRegion(1) + tc.prepareChecker.collect(region) // Add 1 replica on store 2. co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) diff --git a/server/core/region.go b/server/core/region.go index 2aa8d8da1f3..25916e2d212 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -486,10 +486,6 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*metapb.Region { r.regions.Put(region) - if region.Leader == nil { - return overlaps - } - // Add to leaders and followers. for _, peer := range region.GetVoters() { storeID := peer.GetStoreId() From 87f1d2c3bc7df15d02d4192a9d2448dc087f4d8f Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 30 Aug 2018 14:28:20 +0800 Subject: [PATCH 2/2] tiny clean --- server/coordinator_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 28530915ec8..5dd7deeb5c0 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -639,7 +639,6 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { tc.addRegionStore(2, 2) tc.addRegionStore(3, 3) tc.addLeaderRegion(1, 1) - //tc.activeRegions = 1 region := tc.GetRegion(1) tc.prepareChecker.collect(region)