Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support table reader executor read from tiflash. #12371

Merged
merged 7 commits into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ func (builder *RequestBuilder) SetKeepOrder(order bool) *RequestBuilder {
return builder
}

// SetStoreType sets "StoreType" for "kv.Request".
func (builder *RequestBuilder) SetStoreType(storeType kv.StoreType) *RequestBuilder {
builder.Request.StoreType = storeType
return builder
}

func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
storeType: v.StoreType,
}
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down
2 changes: 2 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type TableReaderExecutor struct {
keepOrder bool
desc bool
streaming bool
storeType kv.StoreType
// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
// corColInAccess tells whether there's correlated column in access conditions.
Expand Down Expand Up @@ -184,6 +185,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
Build()
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ const (
ReqSubTypeAnalyzeCol = 10005
)

// StoreType represents the type of a store.
type StoreType uint8

const (
// TiKV means the type of a store is TiKV.
TiKV StoreType = iota
// TiFlash means the type of a store is TiFlash.
TiFlash
)

// Request represents a kv request.
type Request struct {
// Tp is the request type.
Expand Down Expand Up @@ -252,6 +262,8 @@ type Request struct {
Streaming bool
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
// StoreType represents this request is sent to the which type of store.
StoreType StoreType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
4 changes: 4 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -60,6 +61,9 @@ type PhysicalTableReader struct {
// TablePlans flats the tablePlan to construct executor pb.
TablePlans []PhysicalPlan
tablePlan PhysicalPlan

// StoreType indicates table read from which type of store.
StoreType kv.StoreType
}

// GetPhysicalReader returns PhysicalTableReader for logical TableGather.
Expand Down
15 changes: 8 additions & 7 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type CopClient struct {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables) kv.Response {
ctx = context.WithValue(ctx, txnStartKey, req.StartTs)
bo := NewBackoffer(ctx, copBuildTaskMaxBackoff).WithVars(vars)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req.Desc, req.Streaming)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req.Desc, req.Streaming, req.StoreType)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -90,7 +90,7 @@ type copTask struct {
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
storeType StoreType
storeType kv.StoreType
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *copRanges) split(key []byte) (*copRanges, *copRanges) {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bool, streaming bool) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bool, streaming bool, storeType kv.StoreType) ([]*copTask, error) {
start := time.Now()
rangesLen := ranges.len()
cmdType := tikvrpc.CmdCop
Expand All @@ -231,8 +231,9 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: storeType,
})
i = nextI
}
Expand Down Expand Up @@ -778,7 +779,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming)
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming, worker.req.StoreType)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
Expand Down Expand Up @@ -838,7 +839,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req.Desc, worker.req.Streaming)
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req.Desc, worker.req.Streaming, worker.req.StoreType)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
22 changes: 11 additions & 11 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,52 +39,52 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {

bo := NewBackoffer(context.Background(), 3000)

tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), false, false)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
defer cache.Close()
bo := NewBackoffer(context.Background(), 3000)

tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), false, false)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), false, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -167,7 +167,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID)
cache.InvalidateCachedRegion(tasks[1].region)

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), true, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), true, false, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
40 changes: 15 additions & 25 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *RegionStore) follower(seed uint32) int32 {
if followerIdx >= r.workTiKVIdx {
followerIdx++
}
if r.stores[followerIdx].storeType != TiKV {
if r.stores[followerIdx].storeType != kv.TiKV {
continue
}
if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) {
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
if store.getResolveState() == needCheck {
store.reResolve(c)
}
if store.storeType != TiFlash {
if store.storeType != kv.TiFlash {
tikvCnt++
continue
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
tikvRegionCacheCounterWithSendFail.Inc()
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
if ctx.Store.storeType == TiKV {
if ctx.Store.storeType == kv.TiKV {
c.switchNextPeer(r, ctx.PeerIdx, err)
} else {
c.switchNextFlashPeer(r, ctx.PeerIdx, err)
Expand Down Expand Up @@ -1030,7 +1030,7 @@ func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer
}

// FollowerStorePeer returns a follower store with follower peer.
func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, idx int) {
func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) {
return r.getStorePeer(rs, rs.follower(followerStoreSeed))
}

Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
}

nextIdx := (currentPeerIdx + 1) % len(rs.stores)
for rs.stores[nextIdx].storeType == TiFlash {
for rs.stores[nextIdx].storeType == kv.TiFlash {
nextIdx = (nextIdx + 1) % len(rs.stores)
}
newRegionStore := rs.clone()
Expand Down Expand Up @@ -1160,24 +1160,14 @@ func (r *Region) ContainsByEnd(key []byte) bool {
(bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0)
}

// StoreType represents the type of a store.
type StoreType uint8

const (
// TiKV means the type of a store is TiKV.
TiKV StoreType = iota
// TiFlash means the type of a store is TiFlash.
TiFlash
)

// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
storeType StoreType // type of the store
addr string // loaded store address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
storeType kv.StoreType // type of the store
}

type resolveState uint64
Expand Down Expand Up @@ -1222,11 +1212,11 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
addr = store.GetAddress()
s.addr = addr
s.storeType = TiKV
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
s.storeType = TiFlash
s.storeType = kv.TiFlash
}
break
}
Expand Down Expand Up @@ -1267,11 +1257,11 @@ func (s *Store) reResolve(c *RegionCache) {
return
}

storeType := TiKV
storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
storeType = TiFlash
storeType = kv.TiFlash
}
break
}
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequ

// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, TiKV)
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, kv.TiKV)
return resp, err
}

Expand All @@ -80,7 +80,7 @@ func (s *RegionRequestSender) SendReqCtx(
req *tikvrpc.Request,
regionID RegionVerID,
timeout time.Duration,
sType StoreType,
sType kv.StoreType,
) (
resp *tikvrpc.Response,
rpcCtx *RPCContext,
Expand Down Expand Up @@ -113,9 +113,9 @@ func (s *RegionRequestSender) SendReqCtx(
}
for {
switch sType {
case TiKV:
case kv.TiKV:
rpcCtx, err = s.regionCache.GetTiKVRPCContext(bo, regionID, replicaRead, req.ReplicaReadSeed)
case TiFlash:
case kv.TiFlash:
rpcCtx, err = s.regionCache.GetTiFlashRPCContext(bo, regionID)
default:
err = errors.Errorf("unsupported storage type: %v", sType)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"google.golang.org/grpc"
Expand Down Expand Up @@ -131,12 +132,12 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
region, err := s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, TiKV)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
req.ReplicaRead = true
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, TiKV)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
Expand Down