From f40d3daf41741582533b9427acd4d0c9cf27e384 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Fri, 26 Jul 2019 00:12:48 +0800 Subject: [PATCH 1/5] Imp: use bytes pool in session.go; Fix: taskPoll -> taskPool; Add: http pprof --- client.go | 7 ++++++- demo/hello/README.md | 23 +++++++++++++++++------ demo/hello/tcp/client/client.go | 23 +++++++++++++++++++++++ demo/hello/tcp/config.go | 9 +++++++-- demo/hello/tcp/server/server.go | 25 ++++++++++++++----------- demo/util/pprof.go | 20 ++++++++++++++++++++ session.go | 24 ++++++++++++++++++------ 7 files changed, 105 insertions(+), 26 deletions(-) create mode 100644 demo/util/pprof.go diff --git a/client.go b/client.go index 3e903e2e..eff49913 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ import ( ) import ( + gxbytes "github.com/divebomb/gost/bytes" "github.com/gorilla/websocket" perrors "github.com/pkg/errors" ) @@ -163,10 +164,14 @@ func (c *client) dialUDP() Session { localAddr *net.UDPAddr peerAddr *net.UDPAddr length int + bufp *[]byte buf []byte ) - buf = make([]byte, 128) + // buf = make([]byte, 128) + bufp = gxbytes.GetBytes(128) + defer gxbytes.PutBytes(bufp) + buf = *bufp localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0} peerAddr, _ = net.ResolveUDPAddr("udp", c.addr) for { diff --git a/demo/hello/README.md b/demo/hello/README.md index 1b323af1..af9ab471 100644 --- a/demo/hello/README.md +++ b/demo/hello/README.md @@ -1,6 +1,6 @@ # Run Hello Demo -## 1. prepare +## 1. prepare ```bash git clone https://github.com/dubbogo/getty.git @@ -10,19 +10,30 @@ cd getty/demo/hello ## 2. run server -run server: +run server: `go run tcp/server/server.go` Or run server in task pool mode: ```bash go run tcp/server/server.go -taskPool=true \ - -task_queue_length=100 \ - -task_queue_number=4 \ - -task_pool_size=2000 + -task_queue_length=128 \ + -task_queue_number=16 \ + -task_pool_size=2000 \ + -pprof_port=60000 ``` ## 3. run client ```bash go run tcp/client/client.go -``` \ No newline at end of file +``` + +Or run client in task pool mode: +```bash +go run tcp/client/client.go -taskPool=true \ + -task_queue_length=100 \ + -task_queue_number=4 \ + -task_pool_size=50 \ + -pprof_port=60001 +``` + diff --git a/demo/hello/tcp/client/client.go b/demo/hello/tcp/client/client.go index 02d2b4e9..ec58b14a 100644 --- a/demo/hello/tcp/client/client.go +++ b/demo/hello/tcp/client/client.go @@ -13,6 +13,7 @@ import ( import ( "github.com/dubbogo/getty" + "github.com/dubbogo/gost/sync" ) import ( @@ -24,12 +25,33 @@ import ( var ( ip = flag.String("ip", "127.0.0.1", "server IP") connections = flag.Int("conn", 1, "number of tcp connections") + + taskPoolMode = flag.Bool("taskPool", false, "task pool mode") + taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") + taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") + taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size") + pprofPort = flag.Int("pprof_port", 65431, "pprof http port") +) + +var ( + taskPool *gxsync.TaskPool ) func main() { flag.Parse() + util.SetLimit() + util.Profiling(*pprofPort) + + if *taskPoolMode { + taskPool = gxsync.NewTaskPool( + gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), + gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber), + gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize), + ) + } + client := getty.NewTCPClient( getty.WithServerAddress(*ip+":8090"), getty.WithConnectionNumber(*connections), @@ -41,3 +63,4 @@ func main() { util.WaitCloseSignals(client) } + diff --git a/demo/hello/tcp/config.go b/demo/hello/tcp/config.go index b5137d46..2c2f59ff 100644 --- a/demo/hello/tcp/config.go +++ b/demo/hello/tcp/config.go @@ -26,11 +26,16 @@ var ( eventListener = &hello.MessageHandler{} ) -func NewHelloClientSession(session getty.Session) (err error) { +func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) { eventListener.SessionOnOpen = func(session getty.Session) { hello.Sessions = append(hello.Sessions, session) } - return InitialSession(session) + err = InitialSession(session) + if err != nil { + return + } + session.SetTaskPool(taskPool) + return } func InitialSession(session getty.Session) (err error) { diff --git a/demo/hello/tcp/server/server.go b/demo/hello/tcp/server/server.go index a4c0fd0e..410656ef 100644 --- a/demo/hello/tcp/server/server.go +++ b/demo/hello/tcp/server/server.go @@ -22,14 +22,15 @@ import ( ) var ( - taskPollMode = flag.Bool("taskPool", false, "task pool mode") - taskPollQueueLength = flag.Int("task_queue_length", 100, "task queue length") - taskPollQueueNumber = flag.Int("task_queue_number", 4, "task queue number") - taskPollSize = flag.Int("task_pool_size", 2000, "task poll size") + taskPoolMode = flag.Bool("taskPool", false, "task pool mode") + taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length") + taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number") + taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size") + pprofPort = flag.Int("pprof_port", 65432, "pprof http port") ) var ( - taskPoll *gxsync.TaskPool + taskPool *gxsync.TaskPool ) func main() { @@ -37,13 +38,15 @@ func main() { util.SetLimit() + util.Profiling(*pprofPort) + options := []getty.ServerOption{getty.WithLocalAddress(":8090")} - if *taskPollMode { - taskPoll = gxsync.NewTaskPool( - gxsync.WithTaskPoolTaskQueueLength(*taskPollQueueLength), - gxsync.WithTaskPoolTaskQueueNumber(*taskPollQueueNumber), - gxsync.WithTaskPoolTaskPoolSize(*taskPollSize), + if *taskPoolMode { + taskPool = gxsync.NewTaskPool( + gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), + gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber), + gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize), ) } @@ -59,6 +62,6 @@ func NewHelloServerSession(session getty.Session) (err error) { if err != nil { return } - session.SetTaskPool(taskPoll) + session.SetTaskPool(taskPool) return } diff --git a/demo/util/pprof.go b/demo/util/pprof.go new file mode 100644 index 00000000..7b8b9109 --- /dev/null +++ b/demo/util/pprof.go @@ -0,0 +1,20 @@ +/****************************************************** +# MAINTAINER : Alex Stocks +# LICENCE : Apache License 2.0 +# EMAIL : alexstocks@foxmail.com +# MOD : 2019-07-25 +******************************************************/ + +package util + +import ( + "fmt" + "net/http" + _ "net/http/pprof" +) + +func Profiling(port int) { + go func() { + http.ListenAndServe(fmt.Sprintf(":%d", port), nil) + }() +} diff --git a/session.go b/session.go index f64f108e..70e785ff 100644 --- a/session.go +++ b/session.go @@ -20,6 +20,7 @@ import ( ) import ( + gxbytes "github.com/dubbogo/gost/bytes" gxsync "github.com/dubbogo/gost/sync" gxtime "github.com/dubbogo/gost/time" "github.com/gorilla/websocket" @@ -413,16 +414,20 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { var ( l int err error - length uint32 + length int + arrp *[]byte arr []byte ) - length = 0 + length = 64 for i := 0; i < len(pkgs); i++ { - length += uint32(len(pkgs[i])) + length += len(pkgs[i]) } // merge the pkgs - arr = make([]byte, length) + // arr = make([]byte, length) + arrp = gxbytes.GetBytes(length) + defer gxbytes.PutBytes(arrp) + arr = *arrp l = 0 for i := 0; i < len(pkgs); i++ { copy(arr[l:], pkgs[i]) @@ -608,12 +613,16 @@ func (s *session) handleTCPPackage() error { exit bool bufLen int pkgLen int + bufp *[]byte buf []byte pktBuf *bytes.Buffer pkg interface{} ) - buf = make([]byte, maxReadBufLen) + // buf = make([]byte, maxReadBufLen) + bufp = gxbytes.GetBytes(maxReadBufLen) + defer gxbytes.PutBytes(bufp) + buf = *bufp pktBuf = new(bytes.Buffer) conn = s.Connection.(*gettyTCPConn) for { @@ -689,6 +698,7 @@ func (s *session) handleUDPPackage() error { conn *gettyUDPConn bufLen int maxBufLen int + bufp *[]byte buf []byte addr *net.UDPAddr pkgLen int @@ -700,7 +710,9 @@ func (s *session) handleUDPPackage() error { if int(s.maxMsgLen<<1) < bufLen { maxBufLen = int(s.maxMsgLen << 1) } - buf = make([]byte, maxBufLen) + bufp = gxbytes.GetBytes(maxBufLen) //make([]byte, maxBufLen) + defer gxbytes.PutBytes(bufp) + buf = *bufp for { if s.IsClosed() { break From 7f03717a3c109d8e34df148b2e382eb626c7f749 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Fri, 26 Jul 2019 11:04:41 +0800 Subject: [PATCH 2/5] Imp: use bytes.Bytes from memory pool --- session.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/session.go b/session.go index 70e785ff..9bd72d57 100644 --- a/session.go +++ b/session.go @@ -621,9 +621,16 @@ func (s *session) handleTCPPackage() error { // buf = make([]byte, maxReadBufLen) bufp = gxbytes.GetBytes(maxReadBufLen) - defer gxbytes.PutBytes(bufp) buf = *bufp - pktBuf = new(bytes.Buffer) + + // pktBuf = new(bytes.Buffer) + pktBuf = gxbytes.GetBytesBuffer() + + defer func() { + gxbytes.PutBytes(bufp) + gxbytes.PutBytesBuffer(pktBuf) + }() + conn = s.Connection.(*gettyTCPConn) for { if s.IsClosed() { From 2da786c4a21900c99e71d1498be7f5985768e5ba Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sat, 27 Jul 2019 16:02:36 +0800 Subject: [PATCH 3/5] Imp: rollback length 64 -> 0 --- session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session.go b/session.go index 9bd72d57..1bbc5ad8 100644 --- a/session.go +++ b/session.go @@ -418,7 +418,7 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { arrp *[]byte arr []byte ) - length = 64 + length = 0 for i := 0; i < len(pkgs); i++ { length += len(pkgs[i]) } From ab558d1e64bec26d4c69b3c62085313d7eb14ba4 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sat, 27 Jul 2019 23:41:32 +0800 Subject: [PATCH 4/5] Rem: gxbytes --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index eff49913..f15283f7 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,7 @@ import ( ) import ( - gxbytes "github.com/divebomb/gost/bytes" + "github.com/divebomb/gost/bytes" "github.com/gorilla/websocket" perrors "github.com/pkg/errors" ) From 1d7c20c94172bf6d1623f98d65477f5c0ae41a55 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Mon, 29 Jul 2019 11:30:05 +0800 Subject: [PATCH 5/5] Fix: divebomb -> dubbogo/gost --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index f15283f7..a6af574d 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,7 @@ import ( ) import ( - "github.com/divebomb/gost/bytes" + "github.com/dubbogo/gost/bytes" "github.com/gorilla/websocket" perrors "github.com/pkg/errors" )