Skip to content

Commit

Permalink
feat(spanner): report unclosed tracked sessions
Browse files Browse the repository at this point in the history
Any session that hasn't been returned to the pool when the pool is being
closed is most likely a leak. Report all such sessions so that tests can
capture the issue and fail as necessary.
  • Loading branch information
egonelbre committed Feb 21, 2025
1 parent e59ac00 commit bbf1fd5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 25 deletions.
64 changes: 39 additions & 25 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,16 +842,7 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle {
usedSessionsRatio := p.getRatioOfSessionsInUseLocked()
var longRunningSessions []*sessionHandle
if usedSessionsRatio > p.usedSessionsRatioThreshold {
element := p.trackedSessionHandles.Front()
for element != nil {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
if sh.session == nil {
// sessionHandle has already been recycled/destroyed.
sh.mu.Unlock()
element = element.Next()
continue
}
p.iterateTrackedSessionHandlesLocked(func(sh *sessionHandle) {
diff := time.Since(sh.lastUseTime)
if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() {
if (p.ActionOnInactiveTransaction == Warn || p.ActionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged {
Expand All @@ -874,13 +865,30 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle {
longRunningSessions = append(longRunningSessions, sh)
}
}
sh.mu.Unlock()
element = element.Next()
}
})
}
return longRunningSessions
}

// iterateTrackedSessionHandlesLocked iterates over all tracked session handles and locking each for processing.
// Requires p.mu to be held.
func (p *sessionPool) iterateTrackedSessionHandlesLocked(fn func(*sessionHandle)) {
for element := p.trackedSessionHandles.Front(); element != nil; element = element.Next() {
func() {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
defer sh.mu.Unlock()

if sh.session == nil {
// sessionHandle has already been recycled/destroyed.
return
}

fn(sh)
}()
}
}

// removes or logs sessions that are unexpectedly long-running.
func (p *sessionPool) removeLongRunningSessions() {
p.mu.Lock()
Expand Down Expand Up @@ -1063,6 +1071,17 @@ func (p *sessionPool) close(ctx context.Context) {
logf(p.sc.logger, "Failed to unregister callback from the OpenTelemetry meter, error : %v", err)
}
}

if p.TrackSessionHandles || p.ActionOnInactiveTransaction == Warn || p.ActionOnInactiveTransaction == WarnAndClose {
p.iterateTrackedSessionHandlesLocked(func(sh *sessionHandle) {
if sh.stack != nil {
logf(p.sc.logger, "session %s checked out of pool at %s and wasn't returned for closing of the pool: \n%s", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339), sh.stack)
} else {
logf(p.sc.logger, "session %s checked out of pool at %s and wasn't returned for closing of the pool: \nEnable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339))
}
})
}

p.mu.Unlock()
p.hc.close()
// destroy all the sessions
Expand Down Expand Up @@ -1171,20 +1190,15 @@ func (p *sessionPool) errGetSessionTimeoutWithTrackedSessionHandles(code codes.C
func (p *sessionPool) getTrackedSessionHandleStacksLocked() string {
p.mu.Lock()
defer p.mu.Unlock()
stackTraces := ""

var stackTraces strings.Builder
i := 1
element := p.trackedSessionHandles.Front()
for element != nil {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
if sh.stack != nil {
stackTraces = fmt.Sprintf("%s\n\nSession %d checked out of pool at %s by goroutine:\n%s", stackTraces, i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
}
sh.mu.Unlock()
element = element.Next()
p.iterateTrackedSessionHandlesLocked(func(sh *sessionHandle) {
fmt.Fprintf(&stackTraces, "\n\nSession %d checked out of pool at %s by goroutine:\n%s", i, sh.checkoutTime.Format(time.RFC3339), sh.stack)
i++
}
return stackTraces
})

return stackTraces.String()
}

func (p *sessionPool) isHealthy(s *session) bool {
Expand Down
45 changes: 45 additions & 0 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,51 @@ func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing.
iter.Stop()
}

func TestSessionLeak_Close(t *testing.T) {
t.Parallel()

ctx := context.Background()
var logoutput bytes.Buffer
logger := log.Default()
logger.SetOutput(&logoutput)

_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
DisableNativeMetrics: true,
SessionPoolConfig: SessionPoolConfig{
MinOpened: 0,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
ActionOnInactiveTransaction: WarnAndClose,
},
TrackSessionHandles: true,
},
Logger: logger,
})

// intentionally leak the session
single := client.Single()
iter := single.Query(ctx, NewStatement(SelectFooFromBar))
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
teardown()
t.Fatalf("Got unexpected error while iterating results: %v\n", err)
}
}

// teardown should now close the session and report it in the log.
teardown()

leaked := strings.Count(logoutput.String(), "wasn't returned for closing of the pool")
if leaked != 1 {
t.Log(logoutput.String())
t.Fatalf("expected 1 session to be logged, but got %d", leaked)
}
}

func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit bbf1fd5

Please sign in to comment.