Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: use bytes pool in session.go #22

Merged
merged 5 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
"github.com/dubbogo/gost/bytes"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -163,10 +164,14 @@ func (c *client) dialUDP() Session {
localAddr *net.UDPAddr
peerAddr *net.UDPAddr
length int
bufp *[]byte
buf []byte
)

buf = make([]byte, 128)
// buf = make([]byte, 128)
bufp = gxbytes.GetBytes(128)
defer gxbytes.PutBytes(bufp)
buf = *bufp
localAddr = &net.UDPAddr{IP: net.IPv4zero, Port: 0}
peerAddr, _ = net.ResolveUDPAddr("udp", c.addr)
for {
Expand Down
23 changes: 17 additions & 6 deletions demo/hello/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Run Hello Demo

## 1. prepare
## 1. prepare
```bash

git clone https://github.com/dubbogo/getty.git
Expand All @@ -10,19 +10,30 @@ cd getty/demo/hello

## 2. run server

run server:
run server:
`go run tcp/server/server.go`

Or run server in task pool mode:
```bash
go run tcp/server/server.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=2000
-task_queue_length=128 \
-task_queue_number=16 \
-task_pool_size=2000 \
-pprof_port=60000
```

## 3. run client

```bash
go run tcp/client/client.go
```
```

Or run client in task pool mode:
```bash
go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \
-pprof_port=60001
```

23 changes: 23 additions & 0 deletions demo/hello/tcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
)

import (
Expand All @@ -24,12 +25,33 @@ import (
var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")

taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)

var (
taskPool *gxsync.TaskPool
)

func main() {
flag.Parse()

util.SetLimit()

util.Profiling(*pprofPort)

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}

client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
Expand All @@ -41,3 +63,4 @@ func main() {

util.WaitCloseSignals(client)
}

9 changes: 7 additions & 2 deletions demo/hello/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ var (
eventListener = &hello.MessageHandler{}
)

func NewHelloClientSession(session getty.Session) (err error) {
func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
return InitialSession(session)
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}

func InitialSession(session getty.Session) (err error) {
Expand Down
25 changes: 14 additions & 11 deletions demo/hello/tcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,31 @@ import (
)

var (
taskPollMode = flag.Bool("taskPool", false, "task pool mode")
taskPollQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPollQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPollSize = flag.Int("task_pool_size", 2000, "task poll size")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)

var (
taskPoll *gxsync.TaskPool
taskPool *gxsync.TaskPool
)

func main() {
flag.Parse()

util.SetLimit()

util.Profiling(*pprofPort)

options := []getty.ServerOption{getty.WithLocalAddress(":8090")}

if *taskPollMode {
taskPoll = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPollQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPollQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPollSize),
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}

Expand All @@ -59,6 +62,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil {
return
}
session.SetTaskPool(taskPoll)
session.SetTaskPool(taskPool)
return
}
20 changes: 20 additions & 0 deletions demo/util/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/******************************************************
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : [email protected]
# MOD : 2019-07-25
******************************************************/

package util

import (
"fmt"
"net/http"
_ "net/http/pprof"
)

func Profiling(port int) {
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}()
}
31 changes: 25 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

import (
gxbytes "github.com/dubbogo/gost/bytes"
gxsync "github.com/dubbogo/gost/sync"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -413,16 +414,20 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
var (
l int
err error
length uint32
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += uint32(len(pkgs[i]))
length += len(pkgs[i])
}

// merge the pkgs
arr = make([]byte, length)
// arr = make([]byte, length)
arrp = gxbytes.GetBytes(length)
defer gxbytes.PutBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
Expand Down Expand Up @@ -608,13 +613,24 @@ func (s *session) handleTCPPackage() error {
exit bool
bufLen int
pkgLen int
bufp *[]byte
buf []byte
pktBuf *bytes.Buffer
pkg interface{}
)

buf = make([]byte, maxReadBufLen)
pktBuf = new(bytes.Buffer)
// buf = make([]byte, maxReadBufLen)
bufp = gxbytes.GetBytes(maxReadBufLen)
buf = *bufp

// pktBuf = new(bytes.Buffer)
pktBuf = gxbytes.GetBytesBuffer()

defer func() {
gxbytes.PutBytes(bufp)
gxbytes.PutBytesBuffer(pktBuf)
}()

conn = s.Connection.(*gettyTCPConn)
for {
if s.IsClosed() {
Expand Down Expand Up @@ -689,6 +705,7 @@ func (s *session) handleUDPPackage() error {
conn *gettyUDPConn
bufLen int
maxBufLen int
bufp *[]byte
buf []byte
addr *net.UDPAddr
pkgLen int
Expand All @@ -700,7 +717,9 @@ func (s *session) handleUDPPackage() error {
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
buf = make([]byte, maxBufLen)
bufp = gxbytes.GetBytes(maxBufLen) //make([]byte, maxBufLen)
defer gxbytes.PutBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
break
Expand Down