Skip to content

Commit

Permalink
executor: support 'SET CONFIG' syntax to change configs of TiKV/PD in…
Browse files Browse the repository at this point in the history
…stances (#16480) (#16853)
  • Loading branch information
qw4990 authored Apr 27, 2020
1 parent bbe6475 commit 3107607
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 0 deletions.
9 changes: 9 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildSimple(v)
case *plannercore.Set:
return b.buildSet(v)
case *plannercore.SetConfig:
return b.buildSetConfig(v)
case *plannercore.PhysicalSort:
return b.buildSort(v)
case *plannercore.PhysicalTopN:
Expand Down Expand Up @@ -690,6 +692,13 @@ func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
return e
}

func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {
return &SetConfigExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
p: v,
}
}

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
if v.SelectPlan != nil {
// Try to update the forUpdateTS for insert/replace into select statements.
Expand Down
171 changes: 171 additions & 0 deletions executor/set_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/stringutil"
)

// SetConfigExec executes 'SET CONFIG' statement.
type SetConfigExec struct {
baseExecutor
p *core.SetConfig
v string
}

// Open implements the Executor Open interface.
func (s *SetConfigExec) Open(ctx context.Context) error {
// TODO: create a new privilege for this operation instead of using the SuperPriv
checker := privilege.GetPrivilegeManager(s.ctx)
if checker != nil && !checker.RequestVerification(s.ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.SuperPriv) {
return core.ErrSpecificAccessDenied.GenWithStackByArgs("SET CONFIG")
}

if s.p.Type != "" {
s.p.Type = strings.ToLower(s.p.Type)
if s.p.Type != "tikv" && s.p.Type != "tidb" && s.p.Type != "pd" {
return errors.Errorf("unknown type %v", s.p.Type)
}
if s.p.Type == "tidb" {
return errors.Errorf("TiDB doesn't support to change configs online, please use SQL variables")
}
}
if s.p.Instance != "" {
s.p.Instance = strings.ToLower(s.p.Instance)
if !isValidInstance(s.p.Instance) {
return errors.Errorf("invalid instance %v", s.p.Instance)
}
}
s.p.Name = strings.ToLower(s.p.Name)

val, isNull, err := s.p.Value.EvalString(s.ctx, chunk.Row{})
if err != nil {
return err
}
if isNull {
return errors.Errorf("can't set config to null")
}
s.v = val
return nil
}

// TestSetConfigServerInfoKey is used as the key to store 'TestSetConfigServerInfoFunc' in the context.
var TestSetConfigServerInfoKey stringutil.StringerStr = "TestSetConfigServerInfoKey"

// TestSetConfigHTTPHandlerKey is used as the key to store 'TestSetConfigDoRequestFunc' in the context.
var TestSetConfigHTTPHandlerKey stringutil.StringerStr = "TestSetConfigHTTPHandlerKey"

// Next implements the Executor Next interface.
func (s *SetConfigExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
getServerFunc := infoschema.GetClusterServerInfo
if v := s.ctx.Value(TestSetConfigServerInfoKey); v != nil {
getServerFunc = v.(func(sessionctx.Context) ([]infoschema.ServerInfo, error))
}

serversInfo, err := getServerFunc(s.ctx)
if err != nil {
return err
}
nodeTypes := set.NewStringSet()
nodeAddrs := set.NewStringSet()
if s.p.Type != "" {
nodeTypes.Insert(s.p.Type)
}
if s.p.Instance != "" {
nodeAddrs.Insert(s.p.Instance)
}
serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs)

for _, serverInfo := range serversInfo {
var url string
switch serverInfo.ServerType {
case "pd":
url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), serverInfo.StatusAddr, pdapi.Config)
case "tikv":
url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), serverInfo.StatusAddr)
default:
continue
}
if err := s.doRequest(url); err != nil {
s.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
}
return nil
}

func (s *SetConfigExec) doRequest(url string) (retErr error) {
body := bytes.NewBufferString(fmt.Sprintf(`{"%s":"%s"}`, s.p.Name, s.v))
req, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
return err
}
var httpHandler func(req *http.Request) (*http.Response, error)
if v := s.ctx.Value(TestSetConfigHTTPHandlerKey); v != nil {
httpHandler = v.(func(*http.Request) (*http.Response, error))
} else {
httpHandler = util.InternalHTTPClient().Do
}
resp, err := httpHandler(req)
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
if retErr == nil {
retErr = err
}
}
}()
if resp.StatusCode == http.StatusOK {
return nil
} else if resp.StatusCode >= 400 && resp.StatusCode < 600 {
message, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return errors.Errorf("bad request to %s: %s", url, message)
}
return errors.Errorf("request %s failed: %s", url, resp.Status)
}

func isValidInstance(instance string) bool {
ip, port, err := net.SplitHostPort(instance)
if err != nil {
return false
}
if port == "" {
return false
}
v := net.ParseIP(ip)
return v != nil
}
57 changes: 57 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
package executor_test

import (
"bytes"
"context"
"errors"
"io/ioutil"
"net/http"
"strconv"

. "github.com/pingcap/check"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -926,3 +932,54 @@ func (s *testSuite5) TestEnableNoopFunctionsVar(c *C) {
tk.MustExec(`set tidb_enable_noop_functions=0;`)
tk.MustQuery(`select @@tidb_enable_noop_functions;`).Check(testkit.Rows("0"))
}

func (s *testSuite5) TestSetClusterConfig(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

serversInfo := []infoschema.ServerInfo{
{ServerType: "tidb", Address: "127.0.0.1:1111", StatusAddr: "127.0.0.1:1111"},
{ServerType: "tidb", Address: "127.0.0.1:2222", StatusAddr: "127.0.0.1:2222"},
{ServerType: "pd", Address: "127.0.0.1:3333", StatusAddr: "127.0.0.1:3333"},
{ServerType: "pd", Address: "127.0.0.1:4444", StatusAddr: "127.0.0.1:4444"},
{ServerType: "tikv", Address: "127.0.0.1:5555", StatusAddr: "127.0.0.1:5555"},
{ServerType: "tikv", Address: "127.0.0.1:6666", StatusAddr: "127.0.0.1:6666"},
}
var serverInfoErr error
serverInfoFunc := func(sessionctx.Context) ([]infoschema.ServerInfo, error) {
return serversInfo, serverInfoErr
}
tk.Se.SetValue(executor.TestSetConfigServerInfoKey, serverInfoFunc)

c.Assert(tk.ExecToErr("set config xxx log.level='info'"), ErrorMatches, "unknown type xxx")
c.Assert(tk.ExecToErr("set config tidb log.level='info'"), ErrorMatches, "TiDB doesn't support to change configs online, please use SQL variables")
c.Assert(tk.ExecToErr("set config '127.a.b.c:1234' log.level='info'"), ErrorMatches, "invalid instance 127.a.b.c:1234")
c.Assert(tk.ExecToErr("set config tikv log.level=null"), ErrorMatches, "can't set config to null")

httpCnt := 0
tk.Se.SetValue(executor.TestSetConfigHTTPHandlerKey, func(*http.Request) (*http.Response, error) {
httpCnt++
return &http.Response{StatusCode: http.StatusOK, Body: ioutil.NopCloser(nil)}, nil
})
tk.MustExec("set config tikv log.level='info'")
c.Assert(httpCnt, Equals, 2)

httpCnt = 0
tk.MustExec("set config '127.0.0.1:5555' log.level='info'")
c.Assert(httpCnt, Equals, 1)

httpCnt = 0
tk.Se.SetValue(executor.TestSetConfigHTTPHandlerKey, func(*http.Request) (*http.Response, error) {
return nil, errors.New("something wrong")
})
tk.MustExec("set config tikv log.level='info'")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 something wrong", "Warning 1105 something wrong"))

tk.Se.SetValue(executor.TestSetConfigHTTPHandlerKey, func(*http.Request) (*http.Response, error) {
return &http.Response{StatusCode: http.StatusBadRequest, Body: ioutil.NopCloser(bytes.NewBufferString("WRONG"))}, nil
})
tk.MustExec("set config tikv log.level='info'")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 bad request to http://127.0.0.1:5555/config: WRONG", "Warning 1105 bad request to http://127.0.0.1:6666/config: WRONG"))
}
10 changes: 10 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,16 @@ type Set struct {
VarAssigns []*expression.VarAssignment
}

// SetConfig represents a plan for set config stmt.
type SetConfig struct {
baseSchemaProducer

Type string
Instance string
Name string
Value expression.Expression
}

// SQLBindOpType repreents the SQL bind type
type SQLBindOpType int

Expand Down
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) {
return b.buildDo(ctx, x)
case *ast.SetStmt:
return b.buildSet(ctx, x)
case *ast.SetConfigStmt:
return b.buildSetConfig(ctx, x)
case *ast.AnalyzeTableStmt:
return b.buildAnalyze(x)
case *ast.BinlogStmt, *ast.FlushStmt, *ast.UseStmt,
Expand All @@ -522,6 +524,13 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) {
return nil, ErrUnsupportedType.GenWithStack("Unsupported type %T", node)
}

func (b *PlanBuilder) buildSetConfig(ctx context.Context, v *ast.SetConfigStmt) (Plan, error) {
mockTablePlan := LogicalTableDual{}.Init(b.ctx, b.getSelectOffset())
expr, _, err := b.rewrite(ctx, v.Value, mockTablePlan, nil, true)
expr = expression.WrapWithCastAsString(b.ctx, expr)
return &SetConfig{Name: v.Name, Type: v.Type, Instance: v.Instance, Value: expr}, err
}

func (b *PlanBuilder) buildChange(v *ast.ChangeStmt) (Plan, error) {
exe := &Change{
ChangeStmt: v,
Expand Down

0 comments on commit 3107607

Please sign in to comment.