From 1a47fab4a62b4f9a165a74a2a35cf46e39db8fbc Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Fri, 30 Aug 2019 16:54:54 +0800 Subject: [PATCH 1/6] change grNum to chan --- session.go | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/session.go b/session.go index 7dad26e3..9d95f392 100644 --- a/session.go +++ b/session.go @@ -86,9 +86,10 @@ type session struct { // attribute attrs *ValuesContext - // goroutines sync - grNum int32 - lock sync.RWMutex + // goroutines done signal + handleLoopDone chan struct{} + handlePackageDone chan struct{} + lock sync.RWMutex } func newSession(endPoint EndPoint, conn Connection) *session { @@ -102,9 +103,11 @@ func newSession(endPoint EndPoint, conn Connection) *session { period: period, - done: make(chan struct{}), - wait: pendingDuration, - attrs: NewValuesContext(nil), + done: make(chan struct{}), + wait: pendingDuration, + attrs: NewValuesContext(nil), + handleLoopDone: make(chan struct{}), + handlePackageDone: make(chan struct{}), } ss.Connection.setSession(ss) @@ -145,7 +148,8 @@ func (s *session) Reset() { s.period = period s.wait = pendingDuration s.attrs = NewValuesContext(nil) - s.grNum = 0 + s.handleLoopDone = make(chan struct{}) + s.handlePackageDone = make(chan struct{}) s.SetWriteTimeout(netIOTimeout) s.SetReadTimeout(netIOTimeout) @@ -470,7 +474,6 @@ func (s *session) run() { } // start read/write gr - atomic.AddInt32(&(s.grNum), 2) go s.handleLoop() go s.handlePackage() } @@ -496,7 +499,7 @@ func (s *session) handleLoop() { log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } - grNum = atomic.AddInt32(&(s.grNum), -1) + close(s.handleLoopDone) s.listener.OnClose(s) log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum) s.gc() @@ -511,19 +514,17 @@ LOOP: select { case <-s.done: // this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr. - if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed. - if len(s.wQ) == 0 { - log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat()) - break LOOP - } - counter.Start() - // if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() { - if counter.Count() > s.wait.Nanoseconds() { - log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) - break LOOP - } + <-s.handlePackageDone + if len(s.wQ) == 0 { + log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat()) + break LOOP + } + counter.Start() + // if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() { + if counter.Count() > s.wait.Nanoseconds() { + log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) + break LOOP } - case outPkg = <-s.wQ: if flag { if err = s.writer.Write(s, outPkg); err != nil { @@ -577,7 +578,7 @@ func (s *session) handlePackage() { log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } - grNum = atomic.AddInt32(&(s.grNum), -1) + close(s.handlePackageDone) log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) s.stop() if err != nil { @@ -856,5 +857,5 @@ func (s *session) gc() { func (s *session) Close() { s.stop() log.Info("%s closed now. its current gr num is %d", - s.sessionToken(), atomic.LoadInt32(&(s.grNum))) + s.sessionToken()) } From 7d702cf090f9e384871b733d8de13e20bd15cf9e Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Sat, 31 Aug 2019 18:59:06 +0800 Subject: [PATCH 2/6] change log --- session.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index 9d95f392..c292c390 100644 --- a/session.go +++ b/session.go @@ -569,8 +569,6 @@ func (s *session) handlePackage() { ) defer func() { - var grNum int32 - if r := recover(); r != nil { const size = 64 << 10 rBuf := make([]byte, size) @@ -579,7 +577,7 @@ func (s *session) handlePackage() { } close(s.handlePackageDone) - log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) + log.Infof("%s, [session.handlePackage] gr will exit now", s.sessionToken()) s.stop() if err != nil { log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err) @@ -856,6 +854,6 @@ func (s *session) gc() { // or (session)handleLoop automatically. It's thread safe. func (s *session) Close() { s.stop() - log.Info("%s closed now. its current gr num is %d", + log.Info("%s closed now.", s.sessionToken()) } From 2d40a6f66ae001795e4202528ccbdf0a964f10b1 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Mon, 2 Sep 2019 09:47:08 +0800 Subject: [PATCH 3/6] delete handleLoopDone --- session.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/session.go b/session.go index c292c390..1d815023 100644 --- a/session.go +++ b/session.go @@ -87,7 +87,6 @@ type session struct { attrs *ValuesContext // goroutines done signal - handleLoopDone chan struct{} handlePackageDone chan struct{} lock sync.RWMutex } @@ -106,7 +105,6 @@ func newSession(endPoint EndPoint, conn Connection) *session { done: make(chan struct{}), wait: pendingDuration, attrs: NewValuesContext(nil), - handleLoopDone: make(chan struct{}), handlePackageDone: make(chan struct{}), } @@ -148,7 +146,6 @@ func (s *session) Reset() { s.period = period s.wait = pendingDuration s.attrs = NewValuesContext(nil) - s.handleLoopDone = make(chan struct{}) s.handlePackageDone = make(chan struct{}) s.SetWriteTimeout(netIOTimeout) @@ -490,8 +487,6 @@ func (s *session) handleLoop() { ) defer func() { - var grNum int32 - if r := recover(); r != nil { const size = 64 << 10 rBuf := make([]byte, size) @@ -499,9 +494,8 @@ func (s *session) handleLoop() { log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } - close(s.handleLoopDone) s.listener.OnClose(s) - log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum) + log.Info("%s, [session.handleLoop] goroutine exit now", s.Stat()) s.gc() }() From abef1dd90b4385d3ddadc85862dc076ed0b2cc73 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Tue, 3 Sep 2019 19:17:03 +0800 Subject: [PATCH 4/6] change variable name --- session.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/session.go b/session.go index 1d815023..0e6aed6b 100644 --- a/session.go +++ b/session.go @@ -86,9 +86,9 @@ type session struct { // attribute attrs *ValuesContext - // goroutines done signal - handlePackageDone chan struct{} - lock sync.RWMutex + // read goroutines done signal + rDone chan struct{} + lock sync.RWMutex } func newSession(endPoint EndPoint, conn Connection) *session { @@ -102,10 +102,10 @@ func newSession(endPoint EndPoint, conn Connection) *session { period: period, - done: make(chan struct{}), - wait: pendingDuration, - attrs: NewValuesContext(nil), - handlePackageDone: make(chan struct{}), + done: make(chan struct{}), + wait: pendingDuration, + attrs: NewValuesContext(nil), + rDone: make(chan struct{}), } ss.Connection.setSession(ss) @@ -146,7 +146,7 @@ func (s *session) Reset() { s.period = period s.wait = pendingDuration s.attrs = NewValuesContext(nil) - s.handlePackageDone = make(chan struct{}) + s.rDone = make(chan struct{}) s.SetWriteTimeout(netIOTimeout) s.SetReadTimeout(netIOTimeout) @@ -508,7 +508,7 @@ LOOP: select { case <-s.done: // this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr. - <-s.handlePackageDone + <-s.rDone if len(s.wQ) == 0 { log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat()) break LOOP @@ -570,7 +570,7 @@ func (s *session) handlePackage() { log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } - close(s.handlePackageDone) + close(s.rDone) log.Infof("%s, [session.handlePackage] gr will exit now", s.sessionToken()) s.stop() if err != nil { From 0fd80e405c31ab41841216c034e9288273480141 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Tue, 3 Sep 2019 20:33:06 +0800 Subject: [PATCH 5/6] add s.grNum for debug --- session.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/session.go b/session.go index 0e6aed6b..145c3b66 100644 --- a/session.go +++ b/session.go @@ -86,6 +86,8 @@ type session struct { // attribute attrs *ValuesContext + // goroutines sync + grNum int32 // read goroutines done signal rDone chan struct{} lock sync.RWMutex @@ -106,6 +108,7 @@ func newSession(endPoint EndPoint, conn Connection) *session { wait: pendingDuration, attrs: NewValuesContext(nil), rDone: make(chan struct{}), + grNum: 0, } ss.Connection.setSession(ss) @@ -471,6 +474,7 @@ func (s *session) run() { } // start read/write gr + atomic.AddInt32(&(s.grNum), 2) go s.handleLoop() go s.handlePackage() } @@ -494,8 +498,9 @@ func (s *session) handleLoop() { log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } + grNum := atomic.AddInt32(&(s.grNum), -1) s.listener.OnClose(s) - log.Info("%s, [session.handleLoop] goroutine exit now", s.Stat()) + log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum) s.gc() }() @@ -571,7 +576,8 @@ func (s *session) handlePackage() { } close(s.rDone) - log.Infof("%s, [session.handlePackage] gr will exit now", s.sessionToken()) + grNum := atomic.AddInt32(&(s.grNum), -1) + log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) s.stop() if err != nil { log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err) @@ -848,6 +854,6 @@ func (s *session) gc() { // or (session)handleLoop automatically. It's thread safe. func (s *session) Close() { s.stop() - log.Info("%s closed now.", - s.sessionToken()) + log.Info("%s closed now. its current gr num is %d", + s.sessionToken(), atomic.LoadInt32(&(s.grNum))) } From 82a97b4ac747ca18857dbacc08a7ea98a2250281 Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Tue, 3 Sep 2019 20:46:38 +0800 Subject: [PATCH 6/6] fix --- session.go | 1 + 1 file changed, 1 insertion(+) diff --git a/session.go b/session.go index 145c3b66..e92d8131 100644 --- a/session.go +++ b/session.go @@ -150,6 +150,7 @@ func (s *session) Reset() { s.wait = pendingDuration s.attrs = NewValuesContext(nil) s.rDone = make(chan struct{}) + s.grNum = 0 s.SetWriteTimeout(netIOTimeout) s.SetReadTimeout(netIOTimeout)