Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionctx/variable: set log bin #9310

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ func (s *testColumnChangeSuite) TearDownSuite(c *C) {
}

func (s *testColumnChangeSuite) TestColumnChange(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
// create table t (c1 int, c2 int);
tblInfo := testTableInfo(c, d, "t", 2)
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
// insert t values (1, 2);
Expand Down
18 changes: 11 additions & 7 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type testColumnSuite struct {

func (s *testColumnSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_column")
s.d = testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)

var err error
s.d, err = testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
c.Assert(err, IsNil)
s.dbInfo = testSchemaInfo(c, s.d, "test_column")
testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo)
}
Expand Down Expand Up @@ -753,11 +754,12 @@ func (s *testColumnSuite) testGetColumn(t table.Table, name string, isExist bool
}

func (s *testColumnSuite) TestAddColumn(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
c.Assert(err, IsNil)
tblInfo := testTableInfo(c, d, "t", 3)
ctx := testNewContext(d)

err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
Expand Down Expand Up @@ -842,11 +844,12 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
}

func (s *testColumnSuite) TestDropColumn(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
c.Assert(err, IsNil)
tblInfo := testTableInfo(c, d, "t", 4)
ctx := testNewContext(d)

err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
Expand Down Expand Up @@ -919,8 +922,9 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
}

func (s *testColumnSuite) TestModifyColumn(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
tests := []struct {
origin string
to string
Expand Down
19 changes: 14 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"github.com/pingcap/tidb/sessionctx/binloginfo"

"github.com/coreos/etcd/clientv3"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
Expand All @@ -38,7 +40,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
tidbutil "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -329,12 +330,12 @@ func asyncNotifyEvent(d *ddlCtx, e *util.Event) {

// NewDDL creates a new DDL.
func NewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) DDL {
infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) (DDL, error) {
return newDDL(ctx, etcdCli, store, infoHandle, hook, lease, ctxPool)
}

func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) *ddl {
infoHandle *infoschema.Handle, hook Callback, lease time.Duration, ctxPool *pools.ResourcePool) (*ddl, error) {
if hook == nil {
hook = &BaseCallback{}
}
Expand All @@ -359,20 +360,28 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpsClient(),
}

ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
d := &ddl{
infoHandle: infoHandle,
ddlCtx: ddlCtx,
}

binlogCli, err := binloginfo.GetPumpsClientByLogBin()
if err != nil {
return nil, errors.Errorf("[ddl] get pumps client by log_bin failed %v", err)
}
if binlogCli != nil {
d.ddlCtx.binlogCli = binlogCli
}

d.start(ctx, ctxPool)
variable.RegisterStatistics(d)

metrics.DDLCounter.WithLabelValues(metrics.CreateDDLInstance).Inc()
return d
return d, nil
}

// Stop implements DDL.Stop interface.
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func testNewContext(d *ddl) sessionctx.Context {
}

func testNewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
infoHandle *infoschema.Handle, hook Callback, lease time.Duration) *ddl {
infoHandle *infoschema.Handle, hook Callback, lease time.Duration) (*ddl, error) {
return newDDL(ctx, etcdCli, store, infoHandle, hook, lease, nil)
}

Expand Down
47 changes: 30 additions & 17 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
store := testCreateStore(c, "test_owner")
defer store.Close()

d1 := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d1, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d1.Stop()
c.Assert(err, IsNil)
time.Sleep(testLease)
testCheckOwner(c, d1, true)

Expand All @@ -67,7 +68,8 @@ func (s *testDDLSuite) testRunWorker(c *C) {
defer store.Close()

RunWorker = false
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
c.Assert(err, IsNil)
testCheckOwner(c, d, false)
defer d.Stop()

Expand All @@ -76,7 +78,8 @@ func (s *testDDLSuite) testRunWorker(c *C) {
c.Assert(worker, IsNil)
// Make sure the DDL job can be done and exit that goroutine.
RunWorker = true
d1 := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d1, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
c.Assert(err, IsNil)
testCheckOwner(c, d1, true)
defer d1.Stop()
worker = d1.generalWorker()
Expand All @@ -87,8 +90,9 @@ func (s *testDDLSuite) TestSchemaError(c *C) {
store := testCreateStore(c, "test_schema_error")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

doDDLJobErr(c, 1, 0, model.ActionCreateSchema, []interface{}{1}, ctx, d)
Expand All @@ -98,8 +102,9 @@ func (s *testDDLSuite) TestTableError(c *C) {
store := testCreateStore(c, "test_table_error")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

// Schema ID is wrong, so dropping table is failed.
Expand All @@ -112,7 +117,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
// Table ID or schema ID is wrong, so getting table is failed.
tblInfo := testTableInfo(c, d, "t", 3)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
err := kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
job.SchemaID = -1
job.TableID = -1
t := meta.NewMeta(txn)
Expand All @@ -139,8 +144,9 @@ func (s *testDDLSuite) TestViewError(c *C) {
store := testCreateStore(c, "test_view_error")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)
dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, testNewContext(d), d, dbInfo)
Expand All @@ -162,8 +168,9 @@ func (s *testDDLSuite) TestViewError(c *C) {
func (s *testDDLSuite) TestInvalidDDLJob(c *C) {
store := testCreateStore(c, "test_invalid_ddl_job_type_error")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

job := &model.Job{
Expand All @@ -173,16 +180,17 @@ func (s *testDDLSuite) TestInvalidDDLJob(c *C) {
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{},
}
err := d.doDDLJob(ctx, job)
err = d.doDDLJob(ctx, job)
c.Assert(err.Error(), Equals, "[ddl:3]invalid ddl job type: none")
}

func (s *testDDLSuite) TestForeignKeyError(c *C) {
store := testCreateStore(c, "test_foreign_key_error")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d)
Expand All @@ -199,8 +207,9 @@ func (s *testDDLSuite) TestIndexError(c *C) {
store := testCreateStore(c, "test_index_error")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

// Schema ID is wrong.
Expand Down Expand Up @@ -234,8 +243,9 @@ func (s *testDDLSuite) TestIndexError(c *C) {
func (s *testDDLSuite) TestColumnError(c *C) {
store := testCreateStore(c, "test_column_error")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)

dbInfo := testSchemaInfo(c, d, "test")
Expand Down Expand Up @@ -442,16 +452,17 @@ func (s *testDDLSuite) checkCancelDropColumn(c *C, d *ddl, schemaID int64, table
func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
dbInfo := testSchemaInfo(c, d, "test_cancel_job")
testCreateSchema(c, testNewContext(d), d, dbInfo)
// create a partition table.
partitionTblInfo := testTableInfoWithPartition(c, d, "t_partition", 5)
// create table t (c1 int, c2 int, c3 int, c4 int, c5 int);
tblInfo := testTableInfo(c, d, "t", 5)
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, dbInfo, partitionTblInfo)
tableAutoID := int64(100)
Expand Down Expand Up @@ -830,10 +841,11 @@ func (s *testDDLSuite) TestBuildJobDependence(c *C) {
func (s *testDDLSuite) TestParallelDDL(c *C) {
store := testCreateStore(c, "test_parallel_ddl")
defer store.Close()
d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)

/*
Expand Down Expand Up @@ -1021,9 +1033,10 @@ func (s *testDDLSuite) TestDDLPackageExecuteSQL(c *C) {
store := testCreateStore(c, "test_run_sql")
defer store.Close()

d := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, store, nil, nil, testLease)
testCheckOwner(c, d, true)
defer d.Stop()
c.Assert(err, IsNil)
worker := d.generalWorker()
c.Assert(worker, NotNil)

Expand Down
5 changes: 3 additions & 2 deletions ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
)

func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
// create table t_fail (c1 int, c2 int);
tblInfo := testTableInfo(c, d, "t_fail", 2)
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
// insert t_fail values (1, 2);
Expand Down
5 changes: 3 additions & 2 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,17 @@ func getForeignKey(t table.Table, name string) *model.FKInfo {
}

func (s *testForeighKeySuite) TestForeignKey(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
s.d = d
s.dbInfo = testSchemaInfo(c, d, "test_foreign")
ctx := testNewContext(d)
s.ctx = ctx
testCreateSchema(c, ctx, d, s.dbInfo)
tblInfo := testTableInfo(c, d, "t", 3)

err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
Expand Down
5 changes: 3 additions & 2 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ func (s *testIndexChangeSuite) TearDownSuite(c *C) {
}

func (s *testIndexChangeSuite) TestIndexChange(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d, err := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()
c.Assert(err, IsNil)
// create table t (c1 int primary key, c2 int);
tblInfo := testTableInfo(c, d, "t", 2)
tblInfo.Columns[0].Flag = mysql.PriKeyFlag | mysql.NotNullFlag
tblInfo.PKIsHandle = true
ctx := testNewContext(d)
err := ctx.NewTxn(context.Background())
err = ctx.NewTxn(context.Background())
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
originTable := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
Expand Down
Loading