Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: change the evaluation order of columns in Update and Insert statements #57123

Merged
merged 14 commits into from
Dec 18, 2024
108 changes: 93 additions & 15 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"runtime/trace"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand All @@ -36,8 +38,10 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/set"
"github.com/pingcap/tidb/pkg/util/stringutil"
"github.com/pingcap/tidb/pkg/util/tracing"
"go.uber.org/zap"
Expand Down Expand Up @@ -193,7 +197,16 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode, autoColIdx int) error {
func (e *InsertExec) updateDupRow(
ctx context.Context,
idxInBatch int,
txn kv.Transaction,
row toBeCheckedRow,
handle kv.Handle,
_ []*expression.Assignment,
dupKeyCheck table.DupKeyCheckMode,
autoColIdx int,
) error {
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs)
if err != nil {
return err
Expand Down Expand Up @@ -430,8 +443,15 @@ func (e *InsertExec) initEvalBuffer4Dup() {
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode, autoColIdx int) error {
func (e *InsertExec) doDupRowUpdate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only fixes the INSERT .. ON DUPLICATE UPDATE .. case. As I have tested, the UPDATE path also has the same bug:

CREATE TABLE cache (
  cache_key varchar(512) NOT NULL,
  updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
  expires int(11),
  PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
  KEY idx_c_on_expired_at (expired_at)
);

INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1;
update cache set expires = expires + 1 where cache_key = '2001-01-01 11:11:11';

Then the following two queries will have different result:

select /*+ force_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key;
select /*+ ignore_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key;

ctx context.Context,
handle kv.Handle,
oldRow, newRow, extraCols []types.Datum,
assigns []*expression.Assignment,
idxInBatch int,
dupKeyMode table.DupKeyCheckMode,
autoColIdx int,
) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand All @@ -445,23 +465,39 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
e.row4Update = append(e.row4Update, extraCols...)
e.row4Update = append(e.row4Update, newRow...)

// We need to evaluate columns in the following order:
// 1. non-generated columns
// 2. on-update-now columns if non-generated columns are changed
// 3. generated columns if non-generated columns are changed
var generated, nonGenerated []*expression.Assignment
cols := e.Table.Cols()
for _, assign := range assigns {
if cols[assign.Col.Index].IsGenerated() {
generated = append(generated, assign)
} else {
nonGenerated = append(nonGenerated, assign)
}
}

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sctx := e.Ctx()
evalCtx := sctx.GetExprCtx().GetEvalCtx()
sc := sctx.GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for _, col := range cols {
if col.LazyErr != nil {
return col.LazyErr

assignFunc := func(assign *expression.Assignment) error {
if assign.LazyErr != nil {
return assign.LazyErr
}
val, err1 := col.Expr.Eval(evalCtx, e.evalBuffer4Dup.ToRow())
val, err1 := assign.Expr.Eval(evalCtx, e.evalBuffer4Dup.ToRow())
if err1 != nil {
return err1
}
c := col.Col.ToInfo()
c.Name = col.ColName
e.row4Update[col.Col.Index], err1 = table.CastValue(sctx, val, c, false, false)
c := assign.Col.ToInfo()
idx := assign.Col.Index
c.Name = assign.ColName
e.row4Update[idx], err1 = table.CastValue(sctx, val, c, false, false)
if err1 != nil {
return err1
}
Expand All @@ -473,8 +509,47 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
sc.AppendWarnings(newWarnings)
warnCnt += len(newWarnings)
}
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
e.evalBuffer4Dup.SetDatum(idx, e.row4Update[idx])
assignFlag[idx] = true
return nil
}

// Update non-generated columns and check if columns are changed
changed := false
updatedCols := set.NewIntSet()
for _, assign := range nonGenerated {
if err := assignFunc(assign); err != nil {
return errors.Trace(err)
}
idx := assign.Col.Index
cmp, err := e.row4Update[idx].Compare(sc.TypeCtx(), &oldRow[idx], collate.GetBinaryCollator())
if err != nil {
return errors.Trace(err)
}
if cmp != 0 {
changed = true
}
updatedCols.Insert(idx)
}

// Update on-update-now and generated columns if necessary
if changed {
for i, col := range e.Table.Cols() {
if mysql.HasOnUpdateNowFlag(col.GetFlag()) && !updatedCols.Exist(i) {
v, err := expression.GetTimeValue(sctx.GetExprCtx(), strings.ToUpper(ast.CurrentTimestamp), col.GetType(), col.GetDecimal(), nil)
if err != nil {
return errors.Trace(err)
}
e.row4Update[i] = v
e.evalBuffer4Dup.SetDatum(i, e.row4Update[i])
assignFlag[i] = true
}
}
for _, assign := range generated {
if err := assignFunc(assign); err != nil {
return errors.Trace(err)
}
}
}

newData := e.row4Update[:len(oldRow)]
Expand All @@ -489,9 +564,12 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
return nil
}
}
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode, e.ignoreErr)
if err != nil {
return err

if _, err := updateRecord(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move the logic of handling generated column into the updateRecord? It seems that the function updateRecord also handles the ON UPDATE columns (and as we specified all ON UPDATE column in this functioin, these logic are meaningless).

Another possible solution is to remove the codes related to ON UPDATE columns in updateRecord function. However, as the same logic will be used multiple times (in INSERT .. ON DUPLICATE UPDATE and a normal UPDATE statement), I prefer to write the codes related to generated column in updateRecord to avoid repeating the codes.

ctx, e.Ctx(), handle,
oldRow, newData, assignFlag, e.Table,
true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode, e.ignoreErr); err != nil {
return errors.Trace(err)
}

if autoColIdx >= 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,24 @@ func testInsertOnDuplicateKey(t *testing.T, tk *testkit.TestKit) {
"<nil>/<nil>/x/1.2",
"<nil>/<nil>/x/1.2"))

// Test issue 56829
tk.MustExec(`
CREATE TABLE cache (
cache_key varchar(512) NOT NULL,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
expired_at datetime GENERATED ALWAYS AS (if(expires > 0, date_add(updated_at, interval expires second), date_add(updated_at, interval 99 year))) VIRTUAL,
expires int(11),
PRIMARY KEY (cache_key) /*T![clustered_index] CLUSTERED */,
KEY idx_c_on_expired_at (expired_at)
)`)
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("select sleep(1)")
tk.MustExec("INSERT INTO cache(cache_key, expires) VALUES ('2001-01-01 11:11:11', 60) ON DUPLICATE KEY UPDATE expires = expires + 1")
tk.MustExec("admin check table cache")
rs1 := tk.MustQuery("select /*+ force_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key")
rs2 := tk.MustQuery("select /*+ ignore_index(test.cache, idx_c_on_expired_at) */ cache_key, expired_at from cache order by cache_key")
require.True(t, rs1.Equal(rs2.Rows()))

// reproduce insert on duplicate key update bug under new row format.
tk.MustExec(`drop table if exists t1`)
tk.MustExec(`create table t1(c1 decimal(6,4), primary key(c1))`)
Expand Down
11 changes: 9 additions & 2 deletions pkg/executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ var (
// 1. changed (bool) : does the update really change the row values. e.g. update set i = 1 where i = 1;
// 2. err (error) : error in the update.
func updateRecord(
ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool,
ctx context.Context, sctx sessionctx.Context, h kv.Handle,
oldData, newData []types.Datum,
modified []bool,
t table.Table,
onDup bool, _ *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, dupKeyMode table.DupKeyCheckMode, ignoreErr bool,
onDup bool,
_ *memory.Tracker,
fkChecks []*FKCheckExec,
fkCascades []*FKCascadeExec,
dupKeyMode table.DupKeyCheckMode,
ignoreErr bool,
) (bool, error) {
r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord")
defer r.End()
Expand Down