Skip to content

Commit

Permalink
fix intermittent race on closing tail/watch channels
Browse files Browse the repository at this point in the history
  • Loading branch information
gabemontero committed Jul 15, 2021
1 parent cb12d29 commit c4ab769
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
20 changes: 15 additions & 5 deletions pkg/shp/reactor/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reactor

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -13,9 +14,11 @@ import (
// state modifications, should work as a helper to build business logic based on the build POD
// changes.
type PodWatcher struct {
ctx context.Context
stopCh chan bool // stops the event loop execution
watcher watch.Interface // client watch instance
ctx context.Context
stopCh chan bool // stops the event loop execution
stopLock sync.Mutex
stopped bool
watcher watch.Interface // client watch instance

skipPodFn SkipPodFn
onPodAddedFn OnPodEventFn
Expand Down Expand Up @@ -116,7 +119,14 @@ func (p *PodWatcher) Start() (*corev1.Pod, error) {

// Stop closes the stop channel, and stops the execution loop.
func (p *PodWatcher) Stop() {
close(p.stopCh)
// employ sync because of observed 'panic: close of closed channel' when running build run log following
// along with canceling of builds
p.stopLock.Lock()
defer p.stopLock.Unlock()
if !p.stopped {
close(p.stopCh)
p.stopped = true
}
}

// NewPodWatcher instantiate PodWatcher event-loop.
Expand All @@ -130,5 +140,5 @@ func NewPodWatcher(
if err != nil {
return nil, err
}
return &PodWatcher{ctx: ctx, watcher: w, stopCh: make(chan bool)}, nil
return &PodWatcher{ctx: ctx, watcher: w, stopCh: make(chan bool), stopLock: sync.Mutex{}}, nil
}
15 changes: 13 additions & 2 deletions pkg/shp/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -18,6 +19,8 @@ type Tail struct {
ctx context.Context // global context
clientset kubernetes.Interface // kubernetes client instance
stopCh chan bool // stop channel
stopLock sync.Mutex
stopped bool

stdout io.Writer
stderr io.Writer
Expand Down Expand Up @@ -60,13 +63,20 @@ func (t *Tail) Start(ns, podName, container string) {
}()
go func() {
<-t.ctx.Done()
close(t.stopCh)
t.Stop()
}()
}

// Stop closes stop channel to stop log streaming.
func (t *Tail) Stop() {
close(t.stopCh)
// employ sync because of observed 'panic: close of closed channel' when running build run log following
// along with canceling of builds
t.stopLock.Lock()
defer t.stopLock.Unlock()
if !t.stopped {
close(t.stopCh)
t.stopped = true
}
}

// NewTail instantiate Tail, using by default regular stdout and stderr.
Expand All @@ -75,6 +85,7 @@ func NewTail(ctx context.Context, clientset kubernetes.Interface) *Tail {
ctx: ctx,
clientset: clientset,
stopCh: make(chan bool, 1),
stopLock: sync.Mutex{},
stdout: os.Stdout,
stderr: os.Stderr,
}
Expand Down

0 comments on commit c4ab769

Please sign in to comment.