From 1bd189adc654a7e4f559f3ff39acb53c51a1add4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 13 Sep 2018 16:57:50 +0800 Subject: [PATCH 1/4] domain: add recent slow queries queue and ShowSlowQuery() function --- ast/misc.go | 20 ++++++ domain/domain.go | 26 +++++++- domain/domain_test.go | 27 ++++++++ domain/topn_slow_query.go | 116 ++++++++++++++++++++++++++++++--- domain/topn_slow_query_test.go | 37 ++++++++++- executor/adapter.go | 2 +- 6 files changed, 214 insertions(+), 14 deletions(-) diff --git a/ast/misc.go b/ast/misc.go index 2e89eeaec6945..bbe4cf93da3d1 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -626,6 +626,26 @@ type HandleRange struct { End int64 } +// ShowLogType defines the type for SlowLog. +type ShowLogType int + +const ( + // ShowLogTop is a ShowLogType constant. + ShowLogTop ShowLogType = iota + // ShowLogRecent is a ShowLogType constant. + ShowLogRecent +) + +// ShowLog is used for the following command: +// admin show log top [user | internal | all] N +// admin show log recent N +type ShowLog struct { + Tp ShowLogType + Count uint64 + // "user" | "internal" | "all", default is user + Kind string +} + // AdminStmt is the struct for Admin statement. type AdminStmt struct { stmtNode diff --git a/domain/domain.go b/domain/domain.go index 99fc4c546bb44..f0ab9fd459072 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/ngaut/pools" + "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -330,14 +331,25 @@ func (do *Domain) Reload() error { return nil } -// LogTopNSlowQuery keeps topN recent slow queries in domain. -func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) { +// LogSlowQuery keeps topN recent slow queries in domain. +func (do *Domain) LogSlowQuery(query *SlowQueryInfo) { select { case do.slowQuery.ch <- query: default: } } +// ShowSlowQuery returns the slow queries. +func (do *Domain) ShowSlowQuery(showLog *ast.ShowLog) []*SlowQueryInfo { + msg := &showLogMessage{ + request: showLog, + } + msg.Add(1) + do.slowQuery.msgCh <- msg + msg.Wait() + return msg.result +} + func (do *Domain) topNSlowQueryLoop() { defer recoverInDomain("topNSlowQueryLoop", false) defer do.wg.Done() @@ -352,6 +364,14 @@ func (do *Domain) topNSlowQueryLoop() { return } do.slowQuery.Append(info) + case msg := <-do.slowQuery.msgCh: + req := msg.request + if req.Tp == ast.ShowLogTop { + msg.result = do.slowQuery.QueryTop(int(req.Count), req.Kind) + } else if req.Tp == ast.ShowLogRecent { + msg.result = do.slowQuery.QueryRecent(int(req.Count)) + } + msg.Done() } } } @@ -499,7 +519,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout), statsLease: statsLease, infoHandle: infoschema.NewHandle(store), - slowQuery: newTopNSlowQueries(30, time.Hour*24*7), + slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), } } diff --git a/domain/domain_test.go b/domain/domain_test.go index 2b0856ec0ff29..3d1785bb741ab 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -100,6 +100,33 @@ func (*testSuite) TestT(c *C) { succ = dom.SchemaValidator.Check(ts, schemaVer, nil) c.Assert(succ, Equals, ResultSucc) + // For slow query. + dom.LogSlowQuery(&SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true}) + dom.LogSlowQuery(&SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second}) + dom.LogSlowQuery(&SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second}) + // Collecting slow queries is asynchronous, wait a while to ensure it's done. + time.Sleep(5 * time.Millisecond) + + res := dom.ShowSlowQuery(&ast.ShowLog{Tp: ast.ShowLogTop, Count: 2}) + c.Assert(res, HasLen, 2) + c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second}) + c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second}) + + res = dom.ShowSlowQuery(&ast.ShowLog{Tp: ast.ShowLogTop, Count: 2, Kind: "internal"}) + c.Assert(res, HasLen, 1) + c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true}) + + res = dom.ShowSlowQuery(&ast.ShowLog{Tp: ast.ShowLogTop, Count: 4, Kind: "all"}) + c.Assert(res, HasLen, 3) + c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second}) + c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second}) + c.Assert(*res[2], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true}) + + res = dom.ShowSlowQuery(&ast.ShowLog{Tp: ast.ShowLogRecent, Count: 2}) + c.Assert(res, HasLen, 2) + c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second}) + c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second}) + err = store.Close() c.Assert(err, IsNil) } diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index b97a9181663ba..7ef132c5d3781 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -15,8 +15,11 @@ package domain import ( "container/heap" + "sort" + "sync" "time" + "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/util/execdetails" ) @@ -40,11 +43,11 @@ func (h *slowQueryHeap) Pop() interface{} { return x } -func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) { +func (h *slowQueryHeap) Refresh(now time.Time, period time.Duration) { // Remove outdated slow query element. idx := 0 for i := 0; i < len(h.data); i++ { - outdateTime := h.data[i].Start.Add(recent) + outdateTime := h.data[i].Start.Add(period) if outdateTime.After(now) { h.data[idx] = h.data[i] idx++ @@ -59,28 +62,87 @@ func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) { heap.Init(h) } +func (h *slowQueryHeap) Query(count int) []*SlowQueryInfo { + save := make([]*SlowQueryInfo, len(h.data)) + copy(save, h.data) + + sort.Sort(h) + + ret := make([]*SlowQueryInfo, 0, count) + for i := len(h.data) - 1; i >= 0 && len(ret) <= count; i-- { + ret = append(ret, h.data[i]) + } + + // Sort breaks the data, recover it here. + h.data = save + return ret +} + +type slowQueryQueue struct { + data []*SlowQueryInfo + head int + tail int +} + +func (q *slowQueryQueue) Enqueue(info *SlowQueryInfo) { + q.data[q.tail] = info + q.tail = (q.tail + 1) % len(q.data) + if q.tail == q.head { + q.head = (q.head + 1) % len(q.data) + } +} + +func (q *slowQueryQueue) Query(count int) []*SlowQueryInfo { + // Queue is empty. + if q.tail == q.head { + return nil + } + + if count > len(q.data) { + count = len(q.data) + } + ret := make([]*SlowQueryInfo, 0, count) + tail := (q.tail - 1 + len(q.data)) % len(q.data) + for len(ret) < count { + ret = append(ret, q.data[tail]) + if tail == q.head { + break + } + tail = (tail - 1 + len(q.data)) % len(q.data) + } + return ret +} + // topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal. -// N = 30, recent = 7 days by default. +// N = 30, period = 7 days by default. +// It also maintains a recent queue, in a FIFO manner. type topNSlowQueries struct { + recent slowQueryQueue user slowQueryHeap internal slowQueryHeap topN int - recent time.Duration + period time.Duration ch chan *SlowQueryInfo + msgCh chan *showLogMessage } -func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries { +func newTopNSlowQueries(topN int, period time.Duration, queueSize int) *topNSlowQueries { ret := &topNSlowQueries{ topN: topN, - recent: recent, + period: period, ch: make(chan *SlowQueryInfo, 1000), + msgCh: make(chan *showLogMessage, 10), } ret.user.data = make([]*SlowQueryInfo, 0, topN) ret.internal.data = make([]*SlowQueryInfo, 0, topN) + ret.recent.data = make([]*SlowQueryInfo, queueSize+1) return ret } func (q *topNSlowQueries) Append(info *SlowQueryInfo) { + // Put into the recent queue. + q.recent.Enqueue(info) + var h *slowQueryHeap if info.Internal { h = &q.internal @@ -102,8 +164,46 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) { } func (q *topNSlowQueries) Refresh(now time.Time) { - q.user.Refresh(now, q.recent) - q.internal.Refresh(now, q.recent) + q.user.Refresh(now, q.period) + q.internal.Refresh(now, q.period) +} + +type showLogMessage struct { + request *ast.ShowLog + result []*SlowQueryInfo + sync.WaitGroup +} + +type queryType int + +const ( + queryTypeTop queryType = iota + queryTypeRecent +) + +func (q *topNSlowQueries) QueryRecent(count int) []*SlowQueryInfo { + return q.recent.Query(count) +} + +func (q *topNSlowQueries) QueryTop(count int, kind string) []*SlowQueryInfo { + var ret []*SlowQueryInfo + switch kind { + case "user", "": + ret = q.user.Query(count) + case "internal": + ret = q.internal.Query(count) + case "all": + tmp := make([]*SlowQueryInfo, 0, len(q.user.data)+len(q.internal.data)) + tmp = append(tmp, q.user.data...) + tmp = append(tmp, q.internal.data...) + tmp1 := slowQueryHeap{tmp} + sort.Sort(&tmp1) + ret = make([]*SlowQueryInfo, 0, count) + for i := len(tmp) - 1; i >= 0 && len(ret) <= count; i-- { + ret = append(ret, tmp[i]) + } + } + return ret } func (q *topNSlowQueries) Close() { diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index c804e8ba75b17..940bc7f5af1e4 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -24,7 +24,7 @@ var _ = Suite(&testTopNSlowQuerySuite{}) type testTopNSlowQuerySuite struct{} func (t *testTopNSlowQuerySuite) TestPush(c *C) { - slowQuery := newTopNSlowQueries(10, 0) + slowQuery := newTopNSlowQueries(10, 0, 10) // Insert data into the heap. slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond}) slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond}) @@ -71,7 +71,7 @@ func (t *testTopNSlowQuerySuite) TestPush(c *C) { func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { now := time.Now() - slowQuery := newTopNSlowQueries(6, 3*time.Second) + slowQuery := newTopNSlowQueries(6, 3*time.Second, 10) slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6}) slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5}) @@ -108,3 +108,36 @@ func checkHeap(q *slowQueryHeap, c *C) { } } } + +func (t *testTopNSlowQuerySuite) TestQueue(c *C) { + q := newTopNSlowQueries(10, time.Minute, 5) + q.Append(&SlowQueryInfo{SQL: "aaa"}) + q.Append(&SlowQueryInfo{SQL: "bbb"}) + q.Append(&SlowQueryInfo{SQL: "ccc"}) + + query := q.recent.Query(1) + c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"}) + query = q.recent.Query(2) + c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"}) + c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"}) + query = q.recent.Query(6) + c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"}) + c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"}) + c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "aaa"}) + + q.Append(&SlowQueryInfo{SQL: "ddd"}) + q.Append(&SlowQueryInfo{SQL: "eee"}) + q.Append(&SlowQueryInfo{SQL: "fff"}) + q.Append(&SlowQueryInfo{SQL: "ggg"}) + + query = q.recent.Query(3) + c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"}) + c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"}) + c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"}) + query = q.recent.Query(6) + c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"}) + c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"}) + c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"}) + c.Assert(*query[3], Equals, SlowQueryInfo{SQL: "ddd"}) + c.Assert(*query[4], Equals, SlowQueryInfo{SQL: "ccc"}) +} diff --git a/executor/adapter.go b/executor/adapter.go index 379039dc36a39..c08cf30471e4d 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -376,7 +376,7 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { if user != nil { userString = user.String() } - domain.GetDomain(a.Ctx).LogTopNSlowQuery(&domain.SlowQueryInfo{ + domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{ SQL: sql, Start: a.startTime, Duration: costTime, From b7a6db6603b1c6fb4b8a2a65ee7f025ce855db72 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 21 Sep 2018 14:00:29 +0800 Subject: [PATCH 2/4] address comment --- domain/topn_slow_query.go | 48 ++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 7ef132c5d3781..76f754b4a4066 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -63,52 +63,43 @@ func (h *slowQueryHeap) Refresh(now time.Time, period time.Duration) { } func (h *slowQueryHeap) Query(count int) []*SlowQueryInfo { - save := make([]*SlowQueryInfo, len(h.data)) - copy(save, h.data) - + // The sorted array still maintains the heap property. sort.Sort(h) - ret := make([]*SlowQueryInfo, 0, count) - for i := len(h.data) - 1; i >= 0 && len(ret) <= count; i-- { - ret = append(ret, h.data[i]) - } - - // Sort breaks the data, recover it here. - h.data = save - return ret + // The result shoud be in decrease order. + return takeLastN(h.data, count) } type slowQueryQueue struct { data []*SlowQueryInfo - head int - tail int + size int } func (q *slowQueryQueue) Enqueue(info *SlowQueryInfo) { - q.data[q.tail] = info - q.tail = (q.tail + 1) % len(q.data) - if q.tail == q.head { - q.head = (q.head + 1) % len(q.data) + if len(q.data) < q.size { + q.data = append(q.data, info) + return } + + q.data = append(q.data, info)[1:] + return } func (q *slowQueryQueue) Query(count int) []*SlowQueryInfo { // Queue is empty. - if q.tail == q.head { + if len(q.data) == 0 { return nil } + return takeLastN(q.data, count) +} - if count > len(q.data) { - count = len(q.data) +func takeLastN(data []*SlowQueryInfo, count int) []*SlowQueryInfo { + if count > len(data) { + count = len(data) } ret := make([]*SlowQueryInfo, 0, count) - tail := (q.tail - 1 + len(q.data)) % len(q.data) - for len(ret) < count { - ret = append(ret, q.data[tail]) - if tail == q.head { - break - } - tail = (tail - 1 + len(q.data)) % len(q.data) + for i := len(data) - 1; i >= 0 && len(ret) < count; i-- { + ret = append(ret, data[i]) } return ret } @@ -135,7 +126,8 @@ func newTopNSlowQueries(topN int, period time.Duration, queueSize int) *topNSlow } ret.user.data = make([]*SlowQueryInfo, 0, topN) ret.internal.data = make([]*SlowQueryInfo, 0, topN) - ret.recent.data = make([]*SlowQueryInfo, queueSize+1) + ret.recent.size = queueSize + ret.recent.data = make([]*SlowQueryInfo, 0, queueSize) return ret } From 1a8ebdf1a0cbd48e3d9229a8db1e4b4f5903904c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 25 Sep 2018 14:32:48 +0800 Subject: [PATCH 3/4] address comment --- domain/domain.go | 2 +- domain/topn_slow_query.go | 8 ++++---- domain/topn_slow_query_test.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index f0ab9fd459072..5c66ce850ab04 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -358,7 +358,7 @@ func (do *Domain) topNSlowQueryLoop() { for { select { case now := <-ticker.C: - do.slowQuery.Refresh(now) + do.slowQuery.RemoveExpired(now) case info, ok := <-do.slowQuery.ch: if !ok { return diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index 76f754b4a4066..e66cf685026b8 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -43,7 +43,7 @@ func (h *slowQueryHeap) Pop() interface{} { return x } -func (h *slowQueryHeap) Refresh(now time.Time, period time.Duration) { +func (h *slowQueryHeap) RemoveExpired(now time.Time, period time.Duration) { // Remove outdated slow query element. idx := 0 for i := 0; i < len(h.data); i++ { @@ -155,9 +155,9 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) { } } -func (q *topNSlowQueries) Refresh(now time.Time) { - q.user.Refresh(now, q.period) - q.internal.Refresh(now, q.period) +func (q *topNSlowQueries) RemoveExpired(now time.Time) { + q.user.RemoveExpired(now, q.period) + q.internal.RemoveExpired(now, q.period) } type showLogMessage struct { diff --git a/domain/topn_slow_query_test.go b/domain/topn_slow_query_test.go index 940bc7f5af1e4..dbf7ea0a8c6fb 100644 --- a/domain/topn_slow_query_test.go +++ b/domain/topn_slow_query_test.go @@ -69,7 +69,7 @@ func (t *testTopNSlowQuerySuite) TestPush(c *C) { c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond) } -func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { +func (t *testTopNSlowQuerySuite) TestRemoveExpired(c *C) { now := time.Now() slowQuery := newTopNSlowQueries(6, 3*time.Second, 10) @@ -80,7 +80,7 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2}) c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) - slowQuery.Refresh(now.Add(5 * time.Second)) + slowQuery.RemoveExpired(now.Add(5 * time.Second)) c.Assert(len(slowQuery.user.data), Equals, 2) c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond) @@ -91,7 +91,7 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) { c.Assert(len(slowQuery.user.data), Equals, 6) c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) - slowQuery.Refresh(now.Add(6 * time.Second)) + slowQuery.RemoveExpired(now.Add(6 * time.Second)) c.Assert(len(slowQuery.user.data), Equals, 4) c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond) } From cf10de239570f58a7082e674ddf46d1b2af6fbda Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 26 Sep 2018 14:27:54 +0800 Subject: [PATCH 4/4] tiny clean up --- domain/topn_slow_query.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/domain/topn_slow_query.go b/domain/topn_slow_query.go index e41c8973b38dd..52251d34475f2 100644 --- a/domain/topn_slow_query.go +++ b/domain/topn_slow_query.go @@ -190,10 +190,7 @@ func (q *topNSlowQueries) QueryTop(count int, kind ast.ShowSlowKind) []*SlowQuer tmp = append(tmp, q.internal.data...) tmp1 := slowQueryHeap{tmp} sort.Sort(&tmp1) - ret = make([]*SlowQueryInfo, 0, count) - for i := len(tmp) - 1; i >= 0 && len(ret) < count; i-- { - ret = append(ret, tmp[i]) - } + ret = takeLastN(tmp, count) } return ret }