Skip to content

Commit

Permalink
Merge branch 'master' into truncate-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Dec 11, 2018
2 parents cb20ade + ef0ad26 commit 7c06dbb
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 93 deletions.
5 changes: 3 additions & 2 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/logutil"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -58,7 +59,7 @@ func main() {
Level: *logLevel,
})
terror.MustNil(err)
err = session.RegisterStore("tikv", tikv.Driver{})
err = store.Register("tikv", tikv.Driver{})
terror.MustNil(err)
ut := newBenchDB()
works := strings.Split(*runJobs, "|")
Expand Down Expand Up @@ -94,7 +95,7 @@ type benchDB struct {

func newBenchDB() *benchDB {
// Create TiKV store and disable GC as we will trigger GC manually.
store, err := session.NewStore("tikv://" + *addr + "?disableGC=true")
store, err := store.New("tikv://" + *addr + "?disableGC=true")
terror.MustNil(err)
_, err = session.BootstrapSession(store)
terror.MustNil(err)
Expand Down
2 changes: 2 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ metrics-interval = 15
[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0

# Max memory size to use, 0 use the total usable memory in the machine.
max-memory = 0

# StmtCountLimit limits the max count of statement inside a transaction.
stmt-count-limit = 5000

Expand Down
4 changes: 2 additions & 2 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
return nil
}

// Next implements Exec Next interface.
// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("insert.Next", opentracing.ChildOf(span.Context()))
Expand All @@ -151,7 +151,7 @@ func (e *InsertExec) Close() error {
return nil
}

// Open implements the Executor Close interface.
// Open implements the Executor Open interface.
func (e *InsertExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
Expand Down
9 changes: 9 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20"))
_, err = tk.Exec("set global tidb_query_log_max_len = 20")
c.Assert(err, NotNil)

tk.MustExec("set tidb_batch_commit = 0")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("0"))
tk.MustExec("set tidb_batch_commit = 1")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("1"))
_, err = tk.Exec("set global tidb_batch_commit = 0")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_batch_commit = 2")
c.Assert(err, NotNil)
}

func (s *testSuite) TestSetCharset(c *C) {
Expand Down
6 changes: 1 addition & 5 deletions expression/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ func (c *Constant) Eval(_ chunk.Row) (types.Datum, error) {
c.Value.SetNull()
return c.Value, nil
}
retType := types.NewFieldType(c.RetType.Tp)
if retType.Tp == mysql.TypeUnspecified {
retType.Tp = mysql.TypeVarString
}
val, err := dt.ConvertTo(sf.GetCtx().GetSessionVars().StmtCtx, retType)
val, err := dt.ConvertTo(sf.GetCtx().GetSessionVars().StmtCtx, c.RetType)
if err != nil {
return dt, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/pingcap/errors v0.11.0
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20181105061835-1b5d69cd1d26
github.com/pingcap/parser v0.0.0-20181210080325-aa430765015c
github.com/pingcap/parser v0.0.0-20181211024540-4e6d047fcaae
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v0.0.0-20181112132202-4860a0d5de03
github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323
Expand Down
3 changes: 1 addition & 2 deletions planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,8 +1242,7 @@ func (er *expressionRewriter) funcCallToExpression(v *ast.FuncCallExpr) {
er.ctxStack = er.ctxStack[:stackLen-len(v.Args)]
if _, ok := expression.DeferredFunctions[v.FnName.L]; er.useCache() && ok {
function, er.err = expression.NewFunctionBase(er.ctx, v.FnName.L, &v.Type, args...)
c := &expression.Constant{Value: types.NewDatum(nil), RetType: &v.Type, DeferredExpr: function}
c.GetType().Tp = function.GetType().Tp
c := &expression.Constant{Value: types.NewDatum(nil), RetType: types.CloneFieldType(function.GetType()), DeferredExpr: function}
er.ctxStack = append(er.ctxStack, c)
} else {
function, er.err = expression.NewFunction(er.ctx, v.FnName.L, &v.Type, args...)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *testPlanSuite) TestPrepareCacheDeferredFunction(c *C) {
tk.MustExec("prepare sel1 from 'select id, c1 from t1 where c1 < now(3)'")

sql1 := "execute sel1"
expectedPattern := `IndexReader\(Index\(t1.idx1\)\[\[-inf,[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9].000\)\]\)`
expectedPattern := `IndexReader\(Index\(t1.idx1\)\[\[-inf,[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) (2[0-3]|[01][0-9]):[0-5][0-9]:[0-5][0-9].[0-9][0-9][0-9]\)\]\)`

var cnt [2]float64
var planStr [2]string
Expand All @@ -176,7 +176,7 @@ func (s *testPlanSuite) TestPrepareCacheDeferredFunction(c *C) {
counter.Write(pb)
cnt[i] = pb.GetCounter().GetValue()
c.Check(cnt[i], Equals, float64(i))
time.Sleep(time.Second * 1)
time.Sleep(time.Millisecond * 10)
}
c.Assert(planStr[0] < planStr[1], IsTrue)
}
Expand Down
57 changes: 57 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,63 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
c.Assert(err, NotNil)
}

func (s *testSessionSuite) TestBatchCommit(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_batch_commit = 1")
tk.MustExec("create table t (id int)")
saved := config.GetGlobalConfig().Performance
config.GetGlobalConfig().Performance.StmtCountLimit = 3
defer func() {
config.GetGlobalConfig().Performance = saved
}()
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("SET SESSION autocommit = 1")
tk.MustExec("begin")
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (2)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("rollback")
tk1.MustQuery("select * from t").Check(testkit.Rows())

// The above rollback will not make the session in transaction.
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
tk.MustExec("delete from t")

tk.MustExec("begin")
tk.MustExec("insert into t values (5)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (6)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (7)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))

// The session is still in transaction.
tk.MustExec("insert into t values (8)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (9)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (10)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("commit")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10"))

// The above commit will not make the session in transaction.
tk.MustExec("insert into t values (11)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11"))

tk.MustExec("delete from t")
tk.MustExec("SET SESSION autocommit = 0")
tk.MustExec("insert into t values (1)")
tk.MustExec("insert into t values (2)")
tk.MustExec("insert into t values (3)")
tk.MustExec("rollback")
tk1.MustExec("insert into t values (4)")
tk1.MustExec("insert into t values (5)")
tk.MustQuery("select * from t").Check(testkit.Rows("4", "5"))
}

func (s *testSessionSuite) TestCastTimeToDate(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set time_zone = '-8:00'")
Expand Down
69 changes: 15 additions & 54 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package session

import (
"context"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -90,7 +89,6 @@ var (
domap = &domainMap{
domains: map[string]*domain.Domain{},
}
stores = make(map[string]kv.Driver)
// store.UUID()-> IfBootstrapped
storeBootstrapped = make(map[string]bool)
storeBootstrappedLock sync.Mutex
Expand Down Expand Up @@ -153,8 +151,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
var rs sqlexec.RecordSet
se := sctx.(*session)
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
se.GetSessionVars().TxnCtx.StatementCount++
sessVars.TxnCtx.StatementCount++
if !s.IsReadOnly() {
if err == nil {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
Expand All @@ -167,7 +166,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}
}
}
if !se.sessionVars.InTxn() {
if !sessVars.InTxn() {
if err != nil {
log.Info("RollbackTxn for ddl/autocommit error.")
err1 := se.RollbackTxn(ctx)
Expand All @@ -180,10 +179,18 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
// So we limit the statement count in a transaction here.
history := GetHistory(sctx)
if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
if !sessVars.BatchCommit {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
}
err = se.NewTxn(ctx)
// The transaction does not committed yet, we need to keep it in transaction.
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}
if se.txn.pending() {
Expand Down Expand Up @@ -238,52 +245,6 @@ func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs sqlexec.Recor
return rows, nil
}

// RegisterStore registers a kv storage with unique name and its associated Driver.
func RegisterStore(name string, driver kv.Driver) error {
name = strings.ToLower(name)

if _, ok := stores[name]; ok {
return errors.Errorf("%s is already registered", name)
}

stores[name] = driver
return nil
}

// NewStore creates a kv Storage with path.
//
// The path must be a URL format 'engine://path?params' like the one for
// session.Open() but with the dbname cut off.
// Examples:
// goleveldb://relative/path
// boltdb:///absolute/path
//
// The engine should be registered before creating storage.
func NewStore(path string) (kv.Storage, error) {
return newStoreWithRetry(path, util.DefaultMaxRetries)
}

func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
storeURL, err := url.Parse(path)
if err != nil {
return nil, errors.Trace(err)
}

name := strings.ToLower(storeURL.Scheme)
d, ok := stores[name]
if !ok {
return nil, errors.Errorf("invalid uri format, storage %s is not registered", name)
}

var s kv.Storage
err = util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("new store")
s, err = d.Open(path)
return kv.IsRetryableError(err), err
})
return s, errors.Trace(err)
}

var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"}

func trimSQL(sql string) string {
Expand Down
19 changes: 0 additions & 19 deletions session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -51,12 +50,6 @@ type testMainSuite struct {
dom *domain.Domain
}

type brokenStore struct{}

func (s *brokenStore) Open(schema string) (kv.Storage, error) {
return nil, errors.New("try again later")
}

func (s *testMainSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
s.dbName = "test_main_db"
Expand Down Expand Up @@ -111,18 +104,6 @@ func (s *testMainSuite) TestTrimSQL(c *C) {
}
}

func (s *testMainSuite) TestRetryOpenStore(c *C) {
begin := time.Now()
RegisterStore("dummy", &brokenStore{})
store, err := newStoreWithRetry("dummy://dummy-store", 3)
if store != nil {
defer store.Close()
}
c.Assert(err, NotNil)
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
}

func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
store, dom := newStoreWithBootstrap(c, s.dbName+"goroutine_leak")
defer dom.Close()
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ type SessionVars struct {
// BatchDelete indicates if we should split delete data into multiple batches.
BatchDelete bool

// BatchCommit indicates if we should split the transaction into multiple batches.
BatchCommit bool

// IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using
// Table.alloc.
IDAllocator autoid.Allocator
Expand Down Expand Up @@ -633,6 +636,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.BatchInsert = TiDBOptOn(val)
case TiDBBatchDelete:
s.BatchDelete = TiDBOptOn(val)
case TiDBBatchCommit:
s.BatchCommit = TiDBOptOn(val)
case TiDBDMLBatchSize:
s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize)
case TiDBCurrentTS, TiDBConfig:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)},
{ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)},
{ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)},
{ScopeSession, TiDBBatchCommit, boolToIntStr(DefBatchCommit)},
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const (
// split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data.
TiDBBatchDelete = "tidb_batch_delete"

// tidb_batch_commit is used to enable/disable auto-split the transaction.
// If set this option on, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction.
TiDBBatchCommit = "tidb_batch_commit"

// tidb_dml_batch_size is used to split the insert/delete data into small batches.
// It only takes effort when tidb_batch_insert/tidb_batch_delete is on.
// Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB.
Expand Down Expand Up @@ -243,6 +247,7 @@ const (
DefOptInSubqToJoinAndAgg = true
DefBatchInsert = false
DefBatchDelete = false
DefBatchCommit = false
DefCurretTS = 0
DefMaxChunkSize = 32
DefDMLBatchSize = 20000
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown,
TiDBOptInSubqToJoinAndAgg,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down
Loading

0 comments on commit 7c06dbb

Please sign in to comment.