Skip to content

Commit

Permalink
executor: change the evaluation order of columns in Update and `Ins…
Browse files Browse the repository at this point in the history
…ert` statements (#57123) (#58494)

ref #56829
  • Loading branch information
ti-chi-bot authored Feb 27, 2025
1 parent 3fbf496 commit e34e953
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 123 deletions.
97 changes: 72 additions & 25 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -186,7 +187,15 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, autoColIdx int) error {
func (e *InsertExec) updateDupRow(
ctx context.Context,
idxInBatch int,
txn kv.Transaction,
row toBeCheckedRow,
handle kv.Handle,
_ []*expression.Assignment,
autoColIdx int,
) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand Down Expand Up @@ -385,8 +394,14 @@ func (e *InsertExec) initEvalBuffer4Dup() {
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, autoColIdx int) error {
func (e *InsertExec) doDupRowUpdate(
ctx context.Context,
handle kv.Handle,
oldRow, newRow, extraCols []types.Datum,
assigns []*expression.Assignment,
idxInBatch int,
autoColIdx int,
) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand All @@ -400,40 +415,72 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
e.row4Update = append(e.row4Update, extraCols...)
e.row4Update = append(e.row4Update, newRow...)

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sc := e.Ctx().GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for _, col := range cols {
if col.LazyErr != nil {
return col.LazyErr
}
val, err1 := col.Expr.Eval(e.evalBuffer4Dup.ToRow())
if err1 != nil {
return err1
}
c := col.Col.ToInfo()
c.Name = col.ColName
e.row4Update[col.Col.Index], err1 = table.CastValue(e.Ctx(), val, c, false, false)
if err1 != nil {
return err1
// Only evaluate non-generated columns here,
// other fields will be evaluated in updateRecord.
var generated, nonGenerated []*expression.Assignment
cols := e.Table.Cols()
for _, assign := range assigns {
if cols[assign.Col.Index].IsGenerated() {
generated = append(generated, assign)
} else {
nonGenerated = append(nonGenerated, assign)
}
}

warnCnt := int(e.Ctx().GetSessionVars().StmtCtx.WarningCount())
errorHandler := func(sctx sessionctx.Context, assign *expression.Assignment, val *types.Datum, err error) error {
c := assign.Col.ToInfo()
c.Name = assign.ColName
sc := sctx.GetSessionVars().StmtCtx

if newWarnings := sc.TruncateWarnings(warnCnt); len(newWarnings) > 0 {
for k := range newWarnings {
// Use `idxInBatch` here for simplicity, since the offset of the batch is unknown under the current context.
newWarnings[k].Err = completeInsertErr(c, &val, idxInBatch, newWarnings[k].Err)
newWarnings[k].Err = completeInsertErr(c, val, idxInBatch, newWarnings[k].Err)
}
sc.AppendWarnings(newWarnings)
warnCnt += len(newWarnings)
}
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
return err
}

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sctx := e.Ctx()
for _, assign := range nonGenerated {
var val types.Datum
if assign.LazyErr != nil {
return assign.LazyErr
}
val, err := assign.Expr.Eval(e.evalBuffer4Dup.ToRow())
if err != nil {
return err
}

c := assign.Col.ToInfo()
idx := assign.Col.Index
c.Name = assign.ColName
val, err = table.CastValue(sctx, val, c, false, false)
if err != nil {
return err
}

_ = errorHandler(sctx, assign, &val, nil)
e.evalBuffer4Dup.SetDatum(idx, val)
e.row4Update[assign.Col.Index] = val
assignFlag[assign.Col.Index] = true
}

newData := e.row4Update[:len(oldRow)]
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades)
_, err := updateRecord(
ctx, e.Ctx(),
handle, oldRow, newData,
0, generated, e.evalBuffer4Dup, errorHandler,
assignFlag, e.Table,
true, e.memTracker, e.fkChecks, e.fkCascades)

if err != nil {
return err
return errors.Trace(err)
}

if autoColIdx >= 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,24 @@ func testInsertOnDuplicateKey(t *testing.T, tk *testkit.TestKit) {
"<nil>/<nil>/x/1.2",
"<nil>/<nil>/x/1.2"))

// Test issue 56829
tk.MustExec(`
CREATE TABLE cache (
cache_key varchar(512) NOT NULL,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
expires int(11),
PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
KEY idx_c_on_expired_at (expired_at)
)`)
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("select sleep(1)")
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("admin check table cache")
rs1 := tk.MustQuery("select cache_key, expired_at from cache use index() order by cache_key")
rs2 := tk.MustQuery("select cache_key, expired_at from cache use index(idx_c_on_expired_at) order by cache_key")
require.True(t, rs1.Equal(rs2.Rows()))

// reproduce insert on duplicate key update bug under new row format.
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`)
Expand Down
Loading

0 comments on commit e34e953

Please sign in to comment.