Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: Add CachedTableSupport and TemporaryTableSupport for MutateContext #54900

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/table/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/expression/context",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
Expand Down
72 changes: 66 additions & 6 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
exprctx "github.com/pingcap/tidb/pkg/expression/context"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -48,6 +49,61 @@ type StatisticsSupport interface {
UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols)
}

// CachedTableSupport is used for cached table operations
type CachedTableSupport interface {
// AddCachedTableHandleToTxn adds a cached handle to the current transaction
// to handle cached table when committing txn.
// The handle argument should implement `table.CachedTable` interface, but here is `any` to avoid import cycle.
AddCachedTableHandleToTxn(tableID int64, handle any)
}

// TemporaryTableHandler is used by `table.Table` to handle temporary table.
type TemporaryTableHandler struct {
tblInTxn tableutil.TempTable
data variable.TemporaryTableData
}

// NewTemporaryTableHandler creates a new TemporaryTableHandler
func NewTemporaryTableHandler(tbl tableutil.TempTable, data variable.TemporaryTableData) TemporaryTableHandler {
return TemporaryTableHandler{
tblInTxn: tbl,
data: data,
}
}

// Meta returns the meta
func (h *TemporaryTableHandler) Meta() *model.TableInfo {
return h.tblInTxn.GetMeta()
}

// GetDirtySize returns the size of dirty data in txn of the temporary table
func (h *TemporaryTableHandler) GetDirtySize() int64 {
return h.tblInTxn.GetSize()
}

// GetCommittedSize returns the committed data size of the temporary table
func (h *TemporaryTableHandler) GetCommittedSize() int64 {
if h.data == nil {
return 0
}
return h.data.GetTableSize(h.tblInTxn.GetMeta().ID)
}

// UpdateTxnDeltaSize updates the size of dirty data statistics in txn of the temporary table
func (h *TemporaryTableHandler) UpdateTxnDeltaSize(delta int) {
h.tblInTxn.SetSize(h.tblInTxn.GetSize() + int64(delta))
}

// TemporaryTableSupport is used for temporary table operations
type TemporaryTableSupport interface {
// GetTemporaryTableSizeLimit returns the size limit of a temporary table.
GetTemporaryTableSizeLimit() int64
// AddTemporaryTableToTxn adds a temporary table to txn to mark it is modified
// and txn will handle it when committing.
// It returns a `TemporaryTableHandler` object which provides some extra info for the temporary table.
AddTemporaryTableToTxn(tblInfo *model.TableInfo) (TemporaryTableHandler, bool)
}

// MutateContext is used to when mutating a table.
type MutateContext interface {
AllocatorContext
Expand All @@ -62,9 +118,6 @@ type MutateContext interface {
Txn(active bool) (kv.Transaction, error)
// GetDomainInfoSchema returns the latest information schema in domain
GetDomainInfoSchema() infoschema.MetaOnlyInfoSchema
// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified or should allocate id in the transaction.
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
// ConnectionID returns the id of the current connection.
// If the current environment is not in a query from the client, the return value is 0.
ConnectionID() uint64
Expand All @@ -90,11 +143,18 @@ type MutateContext interface {
// GetStatisticsSupport returns a `StatisticsSupport` if the context supports it.
// If the context does not support statistics update, the second return value will be false.
GetStatisticsSupport() (StatisticsSupport, bool)
// GetCachedTableSupport returns a `CachedTableSupport` if the context supports it.
// If the context does not support cached table, the second return value will be false.
GetCachedTableSupport() (CachedTableSupport, bool)
// GetTemporaryTableSupport returns a `TemporaryTableSupport` if the context supports it.
// If the context does not support temporary table, the second return value will be false.
GetTemporaryTableSupport() (TemporaryTableSupport, bool)
}

// AllocatorContext is used to provide context for method `table.Allocators`.
type AllocatorContext interface {
// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified or should allocate id in the transaction.
TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable
// AlternativeAllocators returns an alternative `autoid.Allocators` for the table.
// If the second return value is nil, it means there are no alternative allocators in the context.
// Currently, it provides alternative allocators for temporary tables to alloc IDs in session.
AlternativeAllocators(tbl *model.TableInfo) (autoid.Allocators, bool)
}
4 changes: 3 additions & 1 deletion pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/expression/context",
"//pkg/meta/autoid",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/intest",
"//pkg/util/tableutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
],
Expand All @@ -26,8 +26,10 @@ go_test(
flaky = True,
deps = [
":contextimpl",
"//pkg/parser/model",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/testkit",
"//pkg/util/mock",
"@com_github_pingcap_tipb//go-binlog",
Expand Down
60 changes: 55 additions & 5 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package contextimpl
import (
"github.com/pingcap/failpoint"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table/context"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
)

Expand All @@ -47,10 +47,18 @@ func NewTableContextImpl(sctx sessionctx.Context) *TableContextImpl {
}
}

// TxnRecordTempTable record the temporary table to the current transaction.
// This method will be called when the temporary table is modified in the transaction.
func (ctx *TableContextImpl) TxnRecordTempTable(tbl *model.TableInfo) tableutil.TempTable {
return ctx.vars().GetTemporaryTable(tbl)
// AlternativeAllocators implements the AllocatorContext interface
func (ctx *TableContextImpl) AlternativeAllocators(tbl *model.TableInfo) (allocators autoid.Allocators, ok bool) {
// Use an independent allocator for global temporary tables.
if tbl.TempTableType == model.TempTableGlobal {
if tempTbl := ctx.vars().GetTemporaryTable(tbl); tempTbl != nil {
if alloc := tempTbl.GetAutoIDAllocator(); alloc != nil {
return autoid.NewAllocators(false, alloc), true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why autoid.NewAllocators(false? Shouldn't it be autoid.NewAllocators(tbl.SepAutoInc()

What happen for this case, it uses a different autoid key encoding?

create global temporary table t (id int) on commit delete rows AUTO_ID_CACHE=1  

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

B.t.w, why we did not set Allocators for temp table in the past?
This function call increase object allocation by 1 op/s

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is from https://github.com/pingcap/tidb/pull/39041/files#diff-ccb2771ef1931990c0d0ce3703def7dd53bd21d00a8f6b40b59ca0115f776a60R1550
This PR does not change any code logic.

However, I think it's fine for the current code because the id allocation for memory table is pure in memory and does not share with other sessions.

}
}
// If the session is not in a txn, for example, in "show create table", use the original allocator.
}
return
}

// GetExprCtx returns the ExprContext
Expand Down Expand Up @@ -145,6 +153,48 @@ func (ctx *TableContextImpl) UpdatePhysicalTableDelta(
}
}

// GetCachedTableSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetCachedTableSupport() (context.CachedTableSupport, bool) {
if ctx.vars().TxnCtx != nil {
return ctx, true
}
return nil, false
}

// AddCachedTableHandleToTxn implements `CachedTableSupport` interface
func (ctx *TableContextImpl) AddCachedTableHandleToTxn(tableID int64, handle any) {
txnCtx := ctx.vars().TxnCtx
if txnCtx.CachedTables == nil {
txnCtx.CachedTables = make(map[int64]any)
}
if _, ok := txnCtx.CachedTables[tableID]; !ok {
txnCtx.CachedTables[tableID] = handle
}
}

// GetTemporaryTableSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetTemporaryTableSupport() (context.TemporaryTableSupport, bool) {
if ctx.vars().TxnCtx == nil {
return nil, false
}
return ctx, true
}

// GetTemporaryTableSizeLimit implements TemporaryTableSupport interface.
func (ctx *TableContextImpl) GetTemporaryTableSizeLimit() int64 {
return ctx.vars().TMPTableSize
}

// AddTemporaryTableToTxn implements the TemporaryTableSupport interface.
func (ctx *TableContextImpl) AddTemporaryTableToTxn(tblInfo *model.TableInfo) (context.TemporaryTableHandler, bool) {
vars := ctx.vars()
if tbl := vars.GetTemporaryTable(tblInfo); tbl != nil {
tbl.SetModified(true)
return context.NewTemporaryTableHandler(tbl, vars.TemporaryTableData), true
}
return context.TemporaryTableHandler{}, false
}

func (ctx *TableContextImpl) vars() *variable.SessionVars {
return ctx.Context.GetSessionVars()
}
55 changes: 55 additions & 0 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@ package contextimpl_test
import (
"testing"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/contextimpl"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tipb/go-binlog"
"github.com/stretchr/testify/require"
)

type mockTemporaryData struct {
variable.TemporaryTableData
size int64
}

func (m *mockTemporaryData) GetTableSize(tableID int64) int64 {
return tableID*1000000 + m.size
}

func TestMutateContextImplFields(t *testing.T) {
sctx := mock.NewContext()
sctx.Mutations = make(map[int64]*binlog.TableMutation)
Expand Down Expand Up @@ -114,4 +125,48 @@ func TestMutateContextImplFields(t *testing.T) {
require.Equal(t, int64(1), deltaMap.Delta)
require.Equal(t, int64(2), deltaMap.Count)
require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize)
// cached table support
sctx.GetSessionVars().TxnCtx = nil
cachedTableSupport, ok := ctx.GetCachedTableSupport()
require.False(t, ok)
require.Nil(t, cachedTableSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
cachedTableSupport, ok = ctx.GetCachedTableSupport()
require.True(t, ok)
type mockCachedTable struct {
table.CachedTable
}
handle := &mockCachedTable{}
require.Nil(t, sctx.GetSessionVars().TxnCtx.CachedTables[123])
cachedTableSupport.AddCachedTableHandleToTxn(123, handle)
cached := sctx.GetSessionVars().TxnCtx.CachedTables[123]
require.Same(t, handle, cached)
// temporary table support
sctx.GetSessionVars().TxnCtx = nil
tempTableSupport, ok := ctx.GetTemporaryTableSupport()
require.False(t, ok)
require.Nil(t, tempTableSupport)
sctx.GetSessionVars().TxnCtx = txnCtx
mockTempData := &mockTemporaryData{}
sctx.GetSessionVars().TemporaryTableData = mockTempData
tempTableSupport, ok = ctx.GetTemporaryTableSupport()
require.True(t, ok)
require.Nil(t, txnCtx.TemporaryTables[456])
tmpTblHandler, ok := tempTableSupport.AddTemporaryTableToTxn(&model.TableInfo{
ID: 456,
TempTableType: model.TempTableGlobal,
})
require.True(t, ok)
require.NotNil(t, tmpTblHandler)
tmpTblTable := txnCtx.TemporaryTables[456]
require.NotNil(t, tmpTblTable)
require.True(t, tmpTblTable.GetModified())
require.Equal(t, int64(456000000), tmpTblHandler.GetCommittedSize())
mockTempData.size = 111
require.Equal(t, int64(456000111), tmpTblHandler.GetCommittedSize())
require.Equal(t, int64(0), tmpTblHandler.GetDirtySize())
tmpTblHandler.UpdateTxnDeltaSize(333)
require.Equal(t, int64(333), tmpTblHandler.GetDirtySize())
tmpTblHandler.UpdateTxnDeltaSize(-1)
require.Equal(t, int64(332), tmpTblHandler.GetDirtySize())
}
8 changes: 2 additions & 6 deletions pkg/table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,8 @@ func (c *cachedTable) AddRecord(sctx table.MutateContext, r []types.Datum, opts
}

func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTable) {
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.CachedTables == nil {
txnCtx.CachedTables = make(map[int64]any)
}
if _, ok := txnCtx.CachedTables[tid]; !ok {
txnCtx.CachedTables[tid] = handle
if s, ok := sctx.GetCachedTableSupport(); ok {
s.AddCachedTableHandleToTxn(tid, handle)
}
}

Expand Down
Loading