Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, session: add isolation read with engine type #12997

Merged
merged 15 commits into from
Nov 1, 2019
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
possiblePaths = b.filterPathByIsolationRead(possiblePaths)

var columns []*table.Column
if b.inUpdateStmt {
Expand Down
21 changes: 21 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,27 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tblInf
return available, nil
}

func (b *PlanBuilder) filterPathByIsolationRead(paths []*accessPath) []*accessPath {
// TODO: filter paths with isolation read locations.
isolationReadEngines := b.ctx.GetSessionVars().GetIsolationReadEngines()
if len(isolationReadEngines) == 0 {
return paths
}
for i := len(paths) - 1; i >= 0; i-- {
matchEngineType := false
for _, engine := range isolationReadEngines {
if engine == paths[i].storeType {
matchEngineType = true
break
}
}
if !matchEngineType {
paths = append(paths[:i], paths[i+1:]...)
}
}
return paths
}

func removeIgnoredPaths(paths, ignoredPaths []*accessPath, tblInfo *model.TableInfo) []*accessPath {
if len(ignoredPaths) == 0 {
return paths
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1837,6 +1837,7 @@ var builtinGlobalVariable = []string{
variable.TiDBMaxDeltaSchemaCount,
variable.TiDBCapturePlanBaseline,
variable.TiDBUsePlanBaselines,
variable.TiDBIsolationReadEngines,
}

var (
Expand Down
13 changes: 13 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2815,6 +2815,19 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestIsolationRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(len(tk.Se.GetSessionVars().GetIsolationReadEngines()), Equals, 0)
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv,tiflash';")
engines := tk.Se.GetSessionVars().GetIsolationReadEngines()
c.Assert(len(engines), Equals, 2)
c.Assert(engines[0], Equals, kv.TiKV)
c.Assert(engines[1], Equals, kv.TiFlash)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
Expand Down
21 changes: 21 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ type SessionVars struct {
// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead kv.ReplicaReadType

// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines []kv.StoreType

PlannerSelectBlockAsName []ast.HintTable
}

Expand Down Expand Up @@ -616,6 +619,11 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// GetIsolationReadEngines gets isolation read engines.
func (s *SessionVars) GetIsolationReadEngines() []kv.StoreType {
return s.isolationReadEngines
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
s.GetWriteStmtBufs().clean()
Expand Down Expand Up @@ -956,6 +964,19 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBUsePlanBaselines:
s.UsePlanBaselines = TiDBOptOn(val)
case TiDBIsolationReadEngines:
s.isolationReadEngines = make([]kv.StoreType, 0, 2)
if len(val) == 0 {
break
}
for _, engine := range strings.Split(val, ",") {
switch engine {
case "TiFlash":
s.isolationReadEngines = append(s.isolationReadEngines, kv.TiFlash)
case "TiKV":
s.isolationReadEngines = append(s.isolationReadEngines, kv.TiKV)
}
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBCapturePlanBaseline, "0"},
{ScopeGlobal | ScopeSession, TiDBUsePlanBaselines, BoolToIntStr(DefTiDBUsePlanBaselines)},
{ScopeGlobal | ScopeSession, TiDBIsolationReadEngines, ""},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ const (

// TiDBUsePlanBaselines indicates whether the use of plan baselines is enabled.
TiDBUsePlanBaselines = "tidb_use_plan_baselines"

// TiDBIsolationReadEngines indicates the tidb only read from the stores whose engine type is involved in IsolationReadEngines.
// Now, only support TiKV and TiFlash.
TiDBIsolationReadEngines = "tidb_isolation_read_engines"
)

// Default TiDB system variable values.
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,26 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBIsolationReadEngines:
if value == "" {
return "", nil
}
engines := strings.Split(value, ",")
var formatVal string
for i, engine := range engines {
if i != 0 {
formatVal += ","
}
switch {
case strings.EqualFold(engine, "TiKV"):
formatVal += "TiKV"
case strings.EqualFold(engine, "TiFlash"):
formatVal += "TiFlash"
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
}
return formatVal, nil
}
return value, nil
}
Expand Down