Skip to content

Commit

Permalink
tests: testify the TSO tests (tikv#5169)
Browse files Browse the repository at this point in the history
ref tikv#4813

Testify the TSO tests.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
2 people authored and CabinfeverB committed Jul 14, 2022
1 parent 5f563fa commit 134baae
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 458 deletions.
21 changes: 14 additions & 7 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
)

const (
waitMaxRetry = 200
waitRetrySleep = time.Millisecond * 100
defaultWaitRetryTimes = 200
defaultSleepInterval = time.Millisecond * 100
defaultWaitFor = time.Second * 20
)

// CheckFunc is a condition checker that passed to WaitUntil. Its implementation
Expand All @@ -38,6 +39,7 @@ type CheckFunc func() bool
type WaitOp struct {
retryTimes int
sleepInterval time.Duration
waitFor time.Duration
}

// WaitOption configures WaitOp
Expand All @@ -53,13 +55,18 @@ func WithSleepInterval(sleep time.Duration) WaitOption {
return func(op *WaitOp) { op.sleepInterval = sleep }
}

// WithWaitFor specify the max wait for duration
func WithWaitFor(waitFor time.Duration) WaitOption {
return func(op *WaitOp) { op.waitFor = waitFor }
}

// WaitUntil repeatedly evaluates f() for a period of time, util it returns true.
// NOTICE: this function will be removed soon, please use `Eventually` instead.
func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
c.Log("wait start")
option := &WaitOp{
retryTimes: waitMaxRetry,
sleepInterval: waitRetrySleep,
retryTimes: defaultWaitRetryTimes,
sleepInterval: defaultSleepInterval,
}
for _, opt := range opts {
opt(option)
Expand All @@ -76,15 +83,15 @@ func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
// Eventually asserts that given condition will be met in a period of time.
func Eventually(re *require.Assertions, condition func() bool, opts ...WaitOption) {
option := &WaitOp{
retryTimes: waitMaxRetry,
sleepInterval: waitRetrySleep,
waitFor: defaultWaitFor,
sleepInterval: defaultSleepInterval,
}
for _, opt := range opts {
opt(option)
}
re.Eventually(
condition,
option.sleepInterval*time.Duration(option.retryTimes),
option.waitFor,
option.sleepInterval,
)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestTSOAllocatorLeader(t *testing.T) {

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitAllLeadersWithTestify(re, dcLocationConfig)
cluster.WaitAllLeaders(re, dcLocationConfig)

var (
testServers = cluster.GetServers()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NoError(err)
dcLocationConfig["pd4"] = "dc-4"
cluster.CheckClusterDCLocation()
cluster.WaitAllLeadersWithTestify(re, dcLocationConfig)
cluster.WaitAllLeaders(re, dcLocationConfig)

// Test a nonexistent dc-location for Local TSO
p, l, err := cli.GetLocalTS(context.TODO(), "nonexistent-dc")
Expand Down
26 changes: 2 additions & 24 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -605,27 +604,7 @@ func (c *TestCluster) WaitAllocatorLeader(dcLocation string, ops ...WaitOption)
}

// WaitAllLeaders will block and wait for the election of PD leader and all Local TSO Allocator leaders.
func (c *TestCluster) WaitAllLeaders(testC *check.C, dcLocations map[string]string) {
c.WaitLeader()
c.CheckClusterDCLocation()
// Wait for each DC's Local TSO Allocator leader
wg := sync.WaitGroup{}
for _, dcLocation := range dcLocations {
wg.Add(1)
go func(dc string) {
testutil.WaitUntil(testC, func() bool {
leaderName := c.WaitAllocatorLeader(dc)
return leaderName != ""
})
wg.Done()
}(dcLocation)
}
wg.Wait()
}

// WaitAllLeadersWithTestify will block and wait for the election of PD leader and all Local TSO Allocator leaders.
// NOTICE: this is a temporary function that we will be used to replace `WaitAllLeaders` later.
func (c *TestCluster) WaitAllLeadersWithTestify(re *require.Assertions, dcLocations map[string]string) {
func (c *TestCluster) WaitAllLeaders(re *require.Assertions, dcLocations map[string]string) {
c.WaitLeader()
c.CheckClusterDCLocation()
// Wait for each DC's Local TSO Allocator leader
Expand All @@ -634,8 +613,7 @@ func (c *TestCluster) WaitAllLeadersWithTestify(re *require.Assertions, dcLocati
wg.Add(1)
go func(dc string) {
testutil.Eventually(re, func() bool {
leaderName := c.WaitAllocatorLeader(dc)
return leaderName != ""
return c.WaitAllocatorLeader(dc) != ""
})
wg.Done()
}(dcLocation)
Expand Down
115 changes: 50 additions & 65 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"strconv"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/testutil"
Expand All @@ -33,43 +34,27 @@ import (
"github.com/tikv/pd/tests"
)

var _ = Suite(&testAllocatorSuite{})

type testAllocatorSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testAllocatorSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testAllocatorSuite) TearDownSuite(c *C) {
s.cancel()
}

// Make sure we have the correct number of Local TSO Allocator leaders.
func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
func TestAllocatorLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// There will be three Local TSO Allocator leaders elected
dcLocationConfig := map[string]string{
"pd2": "dc-1",
"pd4": "dc-2",
"pd6": "leader", /* Test dc-location name is same as the special key */
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(s.ctx, dcLocationNum*2, func(conf *config.Config, serverName string) {
cluster, err := tests.NewTestCluster(ctx, dcLocationNum*2, func(conf *config.Config, serverName string) {
if zoneLabel, ok := dcLocationConfig[serverName]; ok {
conf.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = zoneLabel
}
})
re.NoError(err)
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)

cluster.WaitAllLeaders(c, dcLocationConfig)
re.NoError(cluster.RunInitialServers())
cluster.WaitAllLeaders(re, dcLocationConfig)
// To check whether we have enough Local TSO Allocator leaders
allAllocatorLeaders := make([]tso.Allocator, 0, dcLocationNum)
for _, server := range cluster.GetServers() {
Expand All @@ -80,7 +65,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
tso.FilterUninitialized())
// One PD server will have at most three initialized Local TSO Allocators,
// which also means three allocator leaders
c.Assert(len(allocators), LessEqual, dcLocationNum)
re.LessOrEqual(len(allocators), dcLocationNum)
if len(allocators) == 0 {
continue
}
Expand All @@ -96,7 +81,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
}
// At the end, we should have three initialized Local TSO Allocator,
// i.e., the Local TSO Allocator leaders for all dc-locations in testDCLocations
c.Assert(allAllocatorLeaders, HasLen, dcLocationNum)
re.Len(allAllocatorLeaders, dcLocationNum)
allocatorLeaderMemberIDs := make([]uint64, 0, dcLocationNum)
for _, allocator := range allAllocatorLeaders {
allocatorLeader, _ := allocator.(*tso.LocalTSOAllocator)
Expand All @@ -106,62 +91,63 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
// Filter out Global TSO Allocator
allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation))
if _, ok := dcLocationConfig[server.GetServer().Name()]; !ok {
c.Assert(allocators, HasLen, 0)
re.Empty(allocators)
continue
}
c.Assert(allocators, HasLen, dcLocationNum)
re.Len(allocators, dcLocationNum)
for _, allocator := range allocators {
allocatorFollower, _ := allocator.(*tso.LocalTSOAllocator)
allocatorFollowerMemberID := allocatorFollower.GetAllocatorLeader().GetMemberId()
c.Assert(
re.True(
slice.AnyOf(
allocatorLeaderMemberIDs,
func(i int) bool { return allocatorLeaderMemberIDs[i] == allocatorFollowerMemberID }),
IsTrue)
func(i int) bool { return allocatorLeaderMemberIDs[i] == allocatorFollowerMemberID },
),
)
}
}
}

func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
func TestPriorityAndDifferentLocalTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dcLocationConfig := map[string]string{
"pd1": "dc-1",
"pd2": "dc-2",
"pd3": "dc-3",
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(s.ctx, dcLocationNum, func(conf *config.Config, serverName string) {
cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName]
})
defer cluster.Destroy()
c.Assert(err, IsNil)
re.NoError(err)
re.NoError(cluster.RunInitialServers())

err = cluster.RunInitialServers()
c.Assert(err, IsNil)

cluster.WaitAllLeaders(c, dcLocationConfig)
cluster.WaitAllLeaders(re, dcLocationConfig)

// Wait for all nodes becoming healthy.
time.Sleep(time.Second * 5)

// Join a new dc-location
pd4, err := cluster.Join(s.ctx, func(conf *config.Config, serverName string) {
pd4, err := cluster.Join(ctx, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = "dc-4"
})
c.Assert(err, IsNil)
err = pd4.Run()
c.Assert(err, IsNil)
re.NoError(err)
re.NoError(pd4.Run())
dcLocationConfig["pd4"] = "dc-4"
cluster.CheckClusterDCLocation()
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader("dc-4")
return leaderName != ""
})
re.NotEqual("", cluster.WaitAllocatorLeader(
"dc-4",
tests.WithRetryTimes(90), tests.WithWaitInterval(time.Second),
))

// Scatter the Local TSO Allocators to different servers
waitAllocatorPriorityCheck(cluster)
cluster.WaitAllLeaders(c, dcLocationConfig)
cluster.WaitAllLeaders(re, dcLocationConfig)

// Before the priority is checked, we may have allocators typology like this:
// pd1: dc-1, dc-2 and dc-3 allocator leader
Expand All @@ -178,23 +164,22 @@ func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
for serverName, dcLocation := range dcLocationConfig {
go func(serName, dc string) {
defer wg.Done()
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader(dc)
return leaderName == serName
}, testutil.WithRetryTimes(12), testutil.WithSleepInterval(5*time.Second))
testutil.Eventually(re, func() bool {
return cluster.WaitAllocatorLeader(dc) == serName
}, testutil.WithWaitFor(time.Second*90), testutil.WithSleepInterval(time.Second))
}(serverName, dcLocation)
}
wg.Wait()

for serverName, server := range cluster.GetServers() {
tsoAllocatorManager := server.GetTSOAllocatorManager()
localAllocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders()
c.Assert(err, IsNil)
re.NoError(err)
for _, localAllocatorLeader := range localAllocatorLeaders {
s.testTSOSuffix(c, cluster, tsoAllocatorManager, localAllocatorLeader.GetDCLocation())
testTSOSuffix(re, cluster, tsoAllocatorManager, localAllocatorLeader.GetDCLocation())
}
if serverName == cluster.GetLeader() {
s.testTSOSuffix(c, cluster, tsoAllocatorManager, tso.GlobalDCLocation)
testTSOSuffix(re, cluster, tsoAllocatorManager, tso.GlobalDCLocation)
}
}
}
Expand All @@ -211,29 +196,29 @@ func waitAllocatorPriorityCheck(cluster *tests.TestCluster) {
wg.Wait()
}

func (s *testAllocatorSuite) testTSOSuffix(c *C, cluster *tests.TestCluster, am *tso.AllocatorManager, dcLocation string) {
func testTSOSuffix(re *require.Assertions, cluster *tests.TestCluster, am *tso.AllocatorManager, dcLocation string) {
suffixBits := am.GetSuffixBits()
c.Assert(suffixBits, Greater, 0)
re.Greater(suffixBits, 0)
var suffix int64
// The suffix of a Global TSO will always be 0
if dcLocation != tso.GlobalDCLocation {
suffixResp, err := etcdutil.EtcdKVGet(
cluster.GetEtcdClient(),
am.GetLocalTSOSuffixPath(dcLocation))
c.Assert(err, IsNil)
c.Assert(suffixResp.Kvs, HasLen, 1)
re.NoError(err)
re.Len(suffixResp.Kvs, 1)
suffix, err = strconv.ParseInt(string(suffixResp.Kvs[0].Value), 10, 64)
c.Assert(err, IsNil)
c.Assert(suffixBits, GreaterEqual, tso.CalSuffixBits(int32(suffix)))
re.NoError(err)
re.GreaterOrEqual(suffixBits, tso.CalSuffixBits(int32(suffix)))
}
allocator, err := am.GetAllocator(dcLocation)
c.Assert(err, IsNil)
re.NoError(err)
var tso pdpb.Timestamp
testutil.WaitUntil(c, func() bool {
testutil.Eventually(re, func() bool {
tso, err = allocator.GenerateTSO(1)
c.Assert(err, IsNil)
re.NoError(err)
return tso.GetPhysical() != 0
})
// Test whether the TSO has the right suffix
c.Assert(suffix, Equals, tso.Logical&((1<<suffixBits)-1))
re.Equal(suffix, tso.Logical&((1<<suffixBits)-1))
}
Loading

0 comments on commit 134baae

Please sign in to comment.