Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add some memory tables to observe memory usage #38452

Merged
merged 23 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,7 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
zap.Int64("consumed", t.BytesConsumed()),
zap.Int64("quota", t.GetBytesLimit()))
atomic.StoreUint32(&a.e.inSpillMode, 1)
memory.QueryForceDisk.Add(1)
return
}
if fallback := a.GetFallback(); fallback != nil {
Expand Down
6 changes: 5 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,11 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableTrxSummary),
strings.ToLower(infoschema.TableVariablesInfo),
strings.ToLower(infoschema.TableUserAttributes),
strings.ToLower(infoschema.ClusterTableTrxSummary):
strings.ToLower(infoschema.ClusterTableTrxSummary),
strings.ToLower(infoschema.TableMemoryUsage),
strings.ToLower(infoschema.TableMemoryUsageOpsHistory),
strings.ToLower(infoschema.ClusterTableMemoryUsage),
strings.ToLower(infoschema.ClusterTableMemoryUsageOpsHistory):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand Down
70 changes: 70 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ import (
"github.com/pingcap/tidb/util/keydecoder"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
Expand Down Expand Up @@ -174,6 +176,14 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
err = e.setDataForVariablesInfo(sctx)
case infoschema.TableUserAttributes:
err = e.setDataForUserAttributes(ctx, sctx)
case infoschema.TableMemoryUsage:
err = e.setDataForMemoryUsage(sctx)
case infoschema.ClusterTableMemoryUsage:
err = e.setDataForClusterMemoryUsage(sctx)
case infoschema.TableMemoryUsageOpsHistory:
err = e.setDataForMemoryUsageOpsHistory(sctx)
case infoschema.ClusterTableMemoryUsageOpsHistory:
err = e.setDataForClusterMemoryUsageOpsHistory(sctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2398,6 +2408,66 @@ func (e *memtableRetriever) setDataForClusterTrxSummary(ctx sessionctx.Context)
return nil
}

func (e *memtableRetriever) setDataForMemoryUsage(ctx sessionctx.Context) error {
r := memory.ReadMemStats()
currentOps, sessionKillLastDatum := types.NewDatum(nil), types.NewDatum(nil)
if memory.TriggerMemoryLimitGC.Load() || servermemorylimit.IsKilling.Load() {
currentOps.SetString("shrink", mysql.DefaultCollationName)
}
sessionKillLast := servermemorylimit.SessionKillLast.Load()
if !sessionKillLast.IsZero() {
sessionKillLastDatum.SetMysqlTime(types.NewTime(types.FromGoTime(sessionKillLast), mysql.TypeDatetime, 0))
}
gcLast := types.NewTime(types.FromGoTime(memory.MemoryLimitGCLast.Load()), mysql.TypeDatetime, 0)

row := []types.Datum{
types.NewIntDatum(int64(memory.GetMemTotalIgnoreErr())), // MEMORY_TOTAL
types.NewIntDatum(int64(memory.ServerMemoryLimit.Load())), // MEMORY_LIMIT
types.NewIntDatum(int64(r.HeapInuse)), // MEMORY_CURRENT
types.NewIntDatum(int64(servermemorylimit.MemoryMaxUsed.Load())), // MEMORY_MAX_USED
currentOps, // CURRENT_OPS
sessionKillLastDatum, // SESSION_KILL_LAST
types.NewIntDatum(servermemorylimit.SessionKillTotal.Load()), // SESSION_KILL_TOTAL
types.NewTimeDatum(gcLast), // GC_LAST
types.NewIntDatum(memory.MemoryLimitGCTotal.Load()), // GC_TOTAL
types.NewDatum(GlobalDiskUsageTracker.BytesConsumed()), // DISK_USAGE
types.NewDatum(memory.QueryForceDisk.Load()), // QUERY_FORCE_DISK
}
e.rows = append(e.rows, row)
return nil
}

func (e *memtableRetriever) setDataForClusterMemoryUsage(ctx sessionctx.Context) error {
err := e.setDataForMemoryUsage(ctx)
if err != nil {
return err
}
rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
if err != nil {
return err
}
e.rows = rows
return nil
}

func (e *memtableRetriever) setDataForMemoryUsageOpsHistory(ctx sessionctx.Context) error {
e.rows = servermemorylimit.GlobalMemoryOpsHistoryManager.GetRows()
return nil
}

func (e *memtableRetriever) setDataForClusterMemoryUsageOpsHistory(ctx sessionctx.Context) error {
err := e.setDataForMemoryUsageOpsHistory(ctx)
if err != nil {
return err
}
rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
if err != nil {
return err
}
e.rows = rows
return nil
}

type stmtSummaryTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
6 changes: 6 additions & 0 deletions infoschema/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
ClusterTableDeadlocks = "CLUSTER_DEADLOCKS"
// ClusterTableDeadlocks is the string constant of cluster transaction summary table.
ClusterTableTrxSummary = "CLUSTER_TRX_SUMMARY"
// ClusterTableMemoryUsage is the memory usage status of tidb cluster.
ClusterTableMemoryUsage = "CLUSTER_MEMORY_USAGE"
// ClusterTableMemoryUsageOpsHistory is the memory control operators history of tidb cluster.
ClusterTableMemoryUsageOpsHistory = "CLUSTER_MEMORY_USAGE_OPS_HISTORY"
)

// memTableToClusterTables means add memory table to cluster table.
Expand All @@ -58,6 +62,8 @@ var memTableToClusterTables = map[string]string{
TableTiDBTrx: ClusterTableTiDBTrx,
TableDeadlocks: ClusterTableDeadlocks,
TableTrxSummary: ClusterTableTrxSummary,
TableMemoryUsage: ClusterTableMemoryUsage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test to cover the basic
"select * from MEMORY_USAGE" or from TableMemoryUsageOpsHistory?

ClusterTableMemoryXXX might be harder, but I think we can cover the table with non-cluster-perfix

TableMemoryUsageOpsHistory: ClusterTableMemoryUsageOpsHistory,
}

func init() {
Expand Down
39 changes: 39 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ const (
TableVariablesInfo = "VARIABLES_INFO"
// TableUserAttributes is the string constant of user_attributes view.
TableUserAttributes = "USER_ATTRIBUTES"
// TableMemoryUsage is the memory usage status of tidb instance.
TableMemoryUsage = "MEMORY_USAGE"
// TableMemoryUsageOpsHistory is the memory control operators history.
TableMemoryUsageOpsHistory = "MEMORY_USAGE_OPS_HISTORY"
)

const (
Expand Down Expand Up @@ -288,6 +292,10 @@ var tableIDMap = map[string]int64{
ClusterTableTrxSummary: autoid.InformationSchemaDBID + 81,
TableVariablesInfo: autoid.InformationSchemaDBID + 82,
TableUserAttributes: autoid.InformationSchemaDBID + 83,
TableMemoryUsage: autoid.InformationSchemaDBID + 84,
TableMemoryUsageOpsHistory: autoid.InformationSchemaDBID + 85,
ClusterTableMemoryUsage: autoid.InformationSchemaDBID + 86,
ClusterTableMemoryUsageOpsHistory: autoid.InformationSchemaDBID + 87,
}

// columnInfo represents the basic column information of all kinds of INFORMATION_SCHEMA tables
Expand Down Expand Up @@ -1545,6 +1553,35 @@ var tableUserAttributesCols = []columnInfo{
{name: "ATTRIBUTE", tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
}

var tableMemoryUsageCols = []columnInfo{
{name: "MEMORY_TOTAL", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "MEMORY_LIMIT", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "MEMORY_CURRENT", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "MEMORY_MAX_USED", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "CURRENT_OPS", tp: mysql.TypeVarchar, size: 50},
{name: "SESSION_KILL_LAST", tp: mysql.TypeDatetime},
{name: "SESSION_KILL_TOTAL", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "GC_LAST", tp: mysql.TypeDatetime},
{name: "GC_TOTAL", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "DISK_USAGE", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "QUERY_FORCE_DISK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
}

var tableMemoryUsageOpsHistoryCols = []columnInfo{
{name: "TIME", tp: mysql.TypeDatetime, size: 64, flag: mysql.NotNullFlag},
{name: "OPS", tp: mysql.TypeVarchar, size: 20, flag: mysql.NotNullFlag},
{name: "MEMORY_LIMIT", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "MEMORY_CURRENT", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag},
{name: "PROCESSID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag},
{name: "MEM", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag},
{name: "DISK", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag},
{name: "CLIENT", tp: mysql.TypeVarchar, size: 64},
{name: "DB", tp: mysql.TypeVarchar, size: 64},
{name: "USER", tp: mysql.TypeVarchar, size: 16},
{name: "SQL_DIGEST", tp: mysql.TypeVarchar, size: 64},
{name: "SQL_TEXT", tp: mysql.TypeVarchar, size: 256},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -2005,6 +2042,8 @@ var tableNameToColumns = map[string][]columnInfo{
TableTrxSummary: tableTrxSummaryCols,
TableVariablesInfo: tableVariablesInfoCols,
TableUserAttributes: tableUserAttributesCols,
TableMemoryUsage: tableMemoryUsageCols,
TableMemoryUsageOpsHistory: tableMemoryUsageOpsHistoryCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
87 changes: 87 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"math"
"os"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -39,7 +40,10 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gctuner"
"github.com/pingcap/tidb/util/memory"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1560,3 +1564,86 @@ func TestTableConstraintsContainForeignKeys(t *testing.T) {
tk.MustQuery("SELECT * FROM INFORMATION_SCHEMA.table_constraints WHERE constraint_schema = 'tableconstraints' AND table_name = 't2'").Sort().Check(testkit.Rows("def tableconstraints PRIMARY tableconstraints t2 PRIMARY KEY", "def tableconstraints fk_t2_t1 tableconstraints t2 FOREIGN KEY"))
tk.MustQuery("SELECT * FROM INFORMATION_SCHEMA.table_constraints WHERE constraint_schema = 'tableconstraints' AND table_name = 't1'").Sort().Check(testkit.Rows("def tableconstraints PRIMARY tableconstraints t1 PRIMARY KEY"))
}

func TestMemoryUsageAndOpsHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/gctuner/testMemoryLimitTuner", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/gctuner/testMemoryLimitTuner"))
}()
gctuner.GlobalMemoryLimitTuner.Start()
defer func() {
time.Sleep(1 * time.Second) // Wait tuning finished.
}()
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
tk.MustExec("set global tidb_server_memory_limit=512<<20")
tk.MustExec("set global tidb_enable_tmp_storage_on_oom=off")
dom, err := session.GetDomain(store)
require.Nil(t, err)
go dom.ServerMemoryLimitHandle().SetSessionManager(tk.Session().GetSessionManager()).Run()
// OOM
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
for i := 0; i < 9; i++ {
tk.MustExec("insert into t select * from t;")
}

var tmp string
var ok bool
var beginTime = time.Now().Format(types.TimeFormat)
err = tk.QueryToErr("explain analyze select * from t t1 join t t2 join t t3 on t1.a=t2.a and t1.a=t3.a order by t1.a")
var endTime = time.Now().Format(types.TimeFormat)
require.NotNil(t, err)
// Check Memory Table
rows := tk.MustQuery("select * from INFORMATION_SCHEMA.MEMORY_USAGE").Rows()
require.Len(t, rows, 1)
row := rows[0]
require.Len(t, row, 11)
require.Equal(t, row[0], strconv.FormatUint(memory.GetMemTotalIgnoreErr(), 10)) // MEMORY_TOTAL
require.Equal(t, row[1], "536870912") // MEMORY_LIMIT
require.Greater(t, row[2], "0") // MEMORY_CURRENT
tmp, ok = row[3].(string) // MEMORY_MAX_USED
require.Equal(t, ok, true)
val, err := strconv.ParseUint(tmp, 10, 64)
require.Nil(t, err)
require.Greater(t, val, uint64(536870912))

tmp, ok = row[4].(string) // CURRENT_OPS
require.Equal(t, ok, true)
if tmp != "null" && tmp != "shrink" {
require.Fail(t, "CURRENT_OPS get wrong value")
}
require.GreaterOrEqual(t, row[5], beginTime) // SESSION_KILL_LAST
require.LessOrEqual(t, row[5], endTime)
require.Greater(t, row[6], "0") // SESSION_KILL_TOTAL
require.GreaterOrEqual(t, row[7], beginTime) // GC_LAST
require.LessOrEqual(t, row[7], endTime)
require.Greater(t, row[8], "0") // GC_TOTAL
require.Equal(t, row[9], "0") // DISK_USAGE
require.Equal(t, row[10], "0") // QUERY_FORCE_DISK

rows = tk.MustQuery("select * from INFORMATION_SCHEMA.MEMORY_USAGE_OPS_HISTORY").Rows()
require.Greater(t, len(rows), 0)
row = rows[len(rows)-1]
require.Len(t, row, 12)
require.GreaterOrEqual(t, row[0], beginTime) // TIME
require.LessOrEqual(t, row[0], endTime)
require.Equal(t, row[1], "SessionKill") // OPS
require.Equal(t, row[2], "536870912") // MEMORY_LIMIT
tmp, ok = row[3].(string) // MEMORY_CURRENT
require.Equal(t, ok, true)
val, err = strconv.ParseUint(tmp, 10, 64)
require.Nil(t, err)
require.Greater(t, val, uint64(536870912))

require.Greater(t, row[4], "0") // PROCESSID
require.Greater(t, row[5], "0") // MEM
require.Equal(t, row[6], "0") // DISK
require.Equal(t, row[7], "") // CLIENT
require.Equal(t, row[8], "test") // DB
require.Equal(t, row[9], "") // USER
require.Equal(t, row[10], "e3237ec256015a3566757e0c2742507cd30ae04e4cac2fbc14d269eafe7b067b") // SQL_DIGEST
require.Equal(t, row[11], "explain analyze select * from t t1 join t t2 join t t3 on t1.a=t2.a and t1.a=t3.a order by t1.a") // SQL_TEXT
}
5 changes: 5 additions & 0 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo
return item, true
}
}
msm.mu.Lock()
defer msm.mu.Unlock()
if sess := msm.conn[id]; sess != nil {
return sess.ShowProcess(), true
}
return &util.ProcessInfo{}, false
}

Expand Down
1 change: 1 addition & 0 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (c *RowContainer) SpillToDisk() {
defer c.actionSpill.setStatus(spilledYet)
}
var err error
memory.QueryForceDisk.Add(1)
n := c.m.records.inMemory.NumChunks()
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
Expand Down
4 changes: 4 additions & 0 deletions util/gctuner/memory_limit_tuner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func (t *memoryLimitTuner) tuning() {
if float64(r.HeapInuse)*ratio > float64(debug.SetMemoryLimit(-1)) {
if t.nextGCTriggeredByMemoryLimit.Load() && t.waitingReset.CompareAndSwap(false, true) {
go func() {
memory.MemoryLimitGCLast.Store(time.Now())
memory.MemoryLimitGCTotal.Add(1)
debug.SetMemoryLimit(math.MaxInt64)
resetInterval := 1 * time.Minute // Wait 1 minute and set back, to avoid frequent GC
failpoint.Inject("testMemoryLimitTuner", func(val failpoint.Value) {
Expand All @@ -72,10 +74,12 @@ func (t *memoryLimitTuner) tuning() {
continue
}
}()
memory.TriggerMemoryLimitGC.Store(true)
}
t.nextGCTriggeredByMemoryLimit.Store(true)
} else {
t.nextGCTriggeredByMemoryLimit.Store(false)
memory.TriggerMemoryLimitGC.Store(false)
}
}

Expand Down
6 changes: 6 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/tidb/metrics"
atomicutil "go.uber.org/atomic"
Expand All @@ -34,6 +35,11 @@ const TrackMemWhenExceeds = 104857600 // 100MB
var (
ServerMemoryLimit = atomicutil.NewUint64(0)
ServerMemoryLimitSessMinSize = atomicutil.NewUint64(128 << 20)

QueryForceDisk = atomicutil.NewInt64(0)
TriggerMemoryLimitGC = atomicutil.NewBool(false)
MemoryLimitGCLast = atomicutil.NewTime(time.Time{})
MemoryLimitGCTotal = atomicutil.NewInt64(0)
)

// Tracker is used to track the memory usage during query execution.
Expand Down
Loading