Skip to content

Commit

Permalink
executor: support table reader executor read from tiflash. (pingcap#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Jan 17, 2020
1 parent 07b1b29 commit 7fc76d6
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 48 deletions.
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ func (builder *RequestBuilder) SetKeepOrder(order bool) *RequestBuilder {
return builder
}

// SetStoreType sets "StoreType" for "kv.Request".
func (builder *RequestBuilder) SetStoreType(storeType kv.StoreType) *RequestBuilder {
builder.Request.StoreType = storeType
return builder
}

func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
switch builder.Tp {
case kv.ReqTypeAnalyze:
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
storeType: v.StoreType,
}
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down
3 changes: 2 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type TableReaderExecutor struct {
resultHandler *tableResultHandler
streaming bool
feedback *statistics.QueryFeedback

storeType kv.StoreType
// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
// corColInAccess tells whether there's correlated column in access conditions.
Expand Down Expand Up @@ -174,6 +174,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
Build()
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ const (
ReqSubTypeAnalyzeCol = 10005
)

// StoreType represents the type of a store.
type StoreType uint8

const (
// TiKV means the type of a store is TiKV.
TiKV StoreType = iota
// TiFlash means the type of a store is TiFlash.
TiFlash
)

// Request represents a kv request.
type Request struct {
// Tp is the request type.
Expand Down Expand Up @@ -249,6 +259,8 @@ type Request struct {
MemTracker *memory.Tracker
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
// StoreType represents this request is sent to the which type of store.
StoreType StoreType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
4 changes: 4 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -58,6 +59,9 @@ type PhysicalTableReader struct {
// TablePlans flats the tablePlan to construct executor pb.
TablePlans []PhysicalPlan
tablePlan PhysicalPlan

// StoreType indicates table read from which type of store.
StoreType kv.StoreType
}

// PhysicalIndexReader is the index reader in tidb.
Expand Down
19 changes: 10 additions & 9 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *CopClient) supportExpr(exprType tipb.ExprType) bool {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables) kv.Response {
ctx = context.WithValue(ctx, txnStartKey, req.StartTs)
bo := NewBackoffer(ctx, copBuildTaskMaxBackoff).WithVars(vars)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req.Desc, req.Streaming)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -127,7 +127,7 @@ type copTask struct {
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
storeType StoreType
storeType kv.StoreType
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -248,11 +248,11 @@ func (r *copRanges) split(key []byte) (*copRanges, *copRanges) {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bool, streaming bool) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
rangesLen := ranges.len()
cmdType := tikvrpc.CmdCop
if streaming {
if req.Streaming {
cmdType = tikvrpc.CmdCopStream
}

Expand All @@ -268,8 +268,9 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
Expand All @@ -280,7 +281,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
return nil, errors.Trace(err)
}

if desc {
if req.Desc {
reverseTasks(tasks)
}
if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
Expand Down Expand Up @@ -816,7 +817,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming)
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
Expand Down Expand Up @@ -876,7 +877,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req.Desc, worker.req.Streaming)
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
25 changes: 14 additions & 11 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,52 +39,53 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {

bo := NewBackoffer(context.Background(), 3000)

tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), false, false)
req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), false, false)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -154,7 +155,8 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
defer cache.Close()
bo := NewBackoffer(context.Background(), 3000)

tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), false, false)
req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -167,7 +169,8 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID)
cache.InvalidateCachedRegion(tasks[1].region)

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), true, false)
req.Desc = true
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
30 changes: 10 additions & 20 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (r *RegionStore) follower(seed uint32) int32 {
if followerIdx >= r.workTiKVIdx {
followerIdx++
}
if r.stores[followerIdx].storeType != TiKV {
if r.stores[followerIdx].storeType != kv.TiKV {
continue
}
if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) {
Expand Down Expand Up @@ -375,7 +375,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
if store.getResolveState() == needCheck {
store.reResolve(c)
}
if store.storeType != TiFlash {
if store.storeType != kv.TiFlash {
tikvCnt++
continue
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
tikvRegionCacheCounterWithSendFail.Inc()
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
if ctx.Store.storeType == TiKV {
if ctx.Store.storeType == kv.TiKV {
c.switchNextPeer(r, ctx.PeerIdx, err)
} else {
c.switchNextFlashPeer(r, ctx.PeerIdx, err)
Expand Down Expand Up @@ -956,7 +956,7 @@ func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer
}

// FollowerStorePeer returns a follower store with follower peer.
func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, idx int) {
func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) {
return r.getStorePeer(rs, rs.follower(followerStoreSeed))
}

Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
}

nextIdx := (currentPeerIdx + 1) % len(rs.stores)
for rs.stores[nextIdx].storeType == TiFlash {
for rs.stores[nextIdx].storeType == kv.TiFlash {
nextIdx = (nextIdx + 1) % len(rs.stores)
}
newRegionStore := rs.clone()
Expand Down Expand Up @@ -1087,16 +1087,6 @@ func (r *Region) ContainsByEnd(key []byte) bool {
(bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0)
}

// StoreType represents the type of a store.
type StoreType uint8

const (
// TiKV means the type of a store is TiKV.
TiKV StoreType = iota
// TiFlash means the type of a store is TiFlash.
TiFlash
)

// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
Expand All @@ -1105,7 +1095,7 @@ type Store struct {
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
tokenCount atomic2.Int64 // used store token count
storeType StoreType // type of the store
storeType kv.StoreType // type of the store
}

type resolveState uint64
Expand Down Expand Up @@ -1150,11 +1140,11 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
}
addr = store.GetAddress()
s.addr = addr
s.storeType = TiKV
s.storeType = kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
s.storeType = TiFlash
s.storeType = kv.TiFlash
}
break
}
Expand Down Expand Up @@ -1195,11 +1185,11 @@ func (s *Store) reResolve(c *RegionCache) {
return
}

storeType := TiKV
storeType := kv.TiKV
for _, label := range store.Labels {
if label.Key == "engine" {
if label.Value == "tiflash" {
storeType = TiFlash
storeType = kv.TiFlash
}
break
}
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequ

// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, TiKV)
resp, _, err := s.SendReqCtx(bo, req, regionID, timeout, kv.TiKV)
return resp, err
}

Expand All @@ -81,7 +81,7 @@ func (s *RegionRequestSender) SendReqCtx(
req *tikvrpc.Request,
regionID RegionVerID,
timeout time.Duration,
sType StoreType,
sType kv.StoreType,
) (
resp *tikvrpc.Response,
rpcCtx *RPCContext,
Expand Down Expand Up @@ -117,9 +117,9 @@ func (s *RegionRequestSender) SendReqCtx(
seed := req.ReplicaReadSeed
for {
switch sType {
case TiKV:
rpcCtx, err = s.regionCache.GetTiKVRPCContext(bo, regionID, replicaRead, seed)
case TiFlash:
case kv.TiKV:
rpcCtx, err = s.regionCache.GetTiKVRPCContext(bo, regionID, replicaRead, req.ReplicaReadSeed)
case kv.TiFlash:
rpcCtx, err = s.regionCache.GetTiFlashRPCContext(bo, regionID)
default:
err = errors.Errorf("unsupported storage type: %v", sType)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/storeutil"
Expand Down Expand Up @@ -185,12 +186,12 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
region, err := s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, TiKV)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.RawPut, NotNil)
c.Assert(ctx, NotNil)
req.ReplicaRead = true
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, TiKV)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, kv.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.RawPut, NotNil)
c.Assert(ctx, NotNil)
Expand Down

0 comments on commit 7fc76d6

Please sign in to comment.