Skip to content

Commit

Permalink
Merge pull request #56 from gabemontero/fix-follow-no-pod
Browse files Browse the repository at this point in the history
various run build -F/--follow fixes around timeout, lack of pods, and data races
  • Loading branch information
openshift-merge-robot authored Oct 17, 2021
2 parents 4831482 + 71ba9a0 commit c91c9b7
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *CreateCommand) Cmd() *cobra.Command {
}

// Complete fills internal subcommand structure for future work with user input
func (c *CreateCommand) Complete(params *params.Params, args []string) error {
func (c *CreateCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
switch len(args) {
case 1:
c.name = args[0]
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *DeleteCommand) Cmd() *cobra.Command {
}

// Complete fills DeleteSubCommand structure with data obtained from cobra command
func (c *DeleteCommand) Complete(params *params.Params, args []string) error {
func (c *DeleteCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
c.name = args[0]

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (c *ListCommand) Cmd() *cobra.Command {
}

// Complete fills object with user input data
func (c *ListCommand) Complete(params *params.Params, args []string) error {
func (c *ListCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
return nil
}

Expand Down
118 changes: 85 additions & 33 deletions pkg/shp/cmd/build/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type RunCommand struct {
logTail *tail.Tail // follow container logs
tailLogsStarted map[string]bool // controls tail instance per container

buildName string // build name
logLock sync.Mutex

buildName string
buildRunName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
shpClientset buildclientset.Interface
follow bool // flag to tail pod logs
watchLock sync.Mutex
}

const buildRunLongDesc = `
Expand All @@ -53,7 +55,7 @@ func (r *RunCommand) Cmd() *cobra.Command {
}

// Complete picks the build resource name from arguments, and instantiate additional components.
func (r *RunCommand) Complete(params *params.Params, args []string) error {
func (r *RunCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
switch len(args) {
case 1:
r.buildName = args[0]
Expand All @@ -66,6 +68,31 @@ func (r *RunCommand) Complete(params *params.Params, args []string) error {
return err
}
r.logTail = tail.NewTail(r.Cmd().Context(), clientset)
r.ioStreams = io
r.namespace = params.Namespace()
if r.follow {
if r.shpClientset, err = params.ShipwrightClientSet(); err != nil {
return err
}

kclientset, err := params.ClientSet()
if err != nil {
return err
}
to, err := params.RequestTimeout()
if err != nil {
return err
}
r.pw, err = reactor.NewPodWatcher(r.Cmd().Context(), to, kclientset, params.Namespace())
if err != nil {
return err
}

r.pw.WithOnPodModifiedFn(r.onEvent)
r.pw.WithTimeoutPodFn(r.onTimeout)
r.pw.WithNoPodEventsYetFn(r.onNoPodEventsYet)

}

// overwriting build-ref name to use what's on arguments
return r.Cmd().Flags().Set(flags.BuildrefNameFlag, r.buildName)
Expand All @@ -92,11 +119,49 @@ func (r *RunCommand) tailLogs(pod *corev1.Pod) {
}
}

// onNoPodEventsYet reacts to the pod watcher telling us it has not received any pod events for our build run
func (r *RunCommand) onNoPodEventsYet() {
r.Log(fmt.Sprintf("BuildRun %q log following has not observed any pod events yet.", r.buildRunName))
br, err := r.shpClientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Get(r.cmd.Context(), r.buildRunName, metav1.GetOptions{})
if err != nil {
r.Log(fmt.Sprintf("error accessing BuildRun %q: %s", r.buildRunName, err.Error()))
return
}

c := br.Status.GetCondition(buildv1alpha1.Succeeded)
giveUp := false
msg := ""
switch {
case c != nil && c.Status == corev1.ConditionTrue:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as successful.\n", br.Name)
case c != nil && c.Status == corev1.ConditionFalse:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name)
case br.IsCanceled():
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been canceled.\n", br.Name)
case br.DeletionTimestamp != nil:
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name)
case !br.HasStarted():
r.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name))
}
if giveUp {
r.Log(msg)
r.Log(fmt.Sprintf("exiting 'ship build run --follow' for BuildRun %q", br.Name))
r.stop()
}

}

// onTimeout reacts to either the context or request timeout causing the pod watcher to exit
func (r *RunCommand) onTimeout(msg string) {
r.Log(fmt.Sprintf("BuildRun %q log following has stopped because: %q\n", r.buildRunName, msg))
}

// onEvent reacts on pod state changes, to start and stop tailing container logs.
func (r *RunCommand) onEvent(pod *corev1.Pod) error {
// found more data races during unit testing with concurrent events coming in
r.watchLock.Lock()
defer r.watchLock.Unlock()
switch pod.Status.Phase {
case corev1.PodRunning:
// graceful time to wait for container start
Expand All @@ -118,14 +183,14 @@ func (r *RunCommand) onEvent(pod *corev1.Pod) error {
err = fmt.Errorf("build pod '%s' has failed", pod.GetName())
}
// see if because of deletion or cancelation
fmt.Fprintf(r.ioStreams.Out, msg)
r.Log(msg)
r.stop()
return err
case corev1.PodSucceeded:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' has succeeded!\n", pod.GetName())
r.Log(fmt.Sprintf("Pod '%s' has succeeded!\n", pod.GetName()))
r.stop()
default:
fmt.Fprintf(r.ioStreams.Out, "Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase))
r.Log(fmt.Sprintf("Pod '%s' is in state %q...\n", pod.GetName(), string(pod.Status.Phase)))
// handle any issues with pulling images that may fail
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodInitialized || c.Type == corev1.ContainersReady {
Expand All @@ -146,10 +211,7 @@ func (r *RunCommand) stop() {

// Run creates a BuildRun resource based on Build's name informed on arguments.
func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOStreams) error {
// ran into some data race conditions during unit test with this starting up, but pod events
// coming in before we completed initialization below
r.watchLock.Lock()
// resource using GenerateName, which will provice a unique instance
// resource using GenerateName, which will provide a unique instance
br := &buildv1alpha1.BuildRun{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", r.buildName),
Expand All @@ -162,7 +224,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
if err != nil {
return err
}
br, err = clientset.ShipwrightV1alpha1().BuildRuns(params.Namespace()).Create(r.cmd.Context(), br, metav1.CreateOptions{})
br, err = clientset.ShipwrightV1alpha1().BuildRuns(r.namespace).Create(r.cmd.Context(), br, metav1.CreateOptions{})
if err != nil {
return err
}
Expand All @@ -172,15 +234,7 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
return nil
}

r.ioStreams = ioStreams
kclientset, err := params.ClientSet()
if err != nil {
return err
}
r.buildRunName = br.Name
if r.shpClientset, err = params.ShipwrightClientSet(); err != nil {
return err
}

// instantiating a pod watcher with a specific label-selector to find the indented pod where the
// actual build started by this subcommand is being executed, including the randomized buildrun
Expand All @@ -190,19 +244,17 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
r.buildName,
br.GetName(),
)}
r.pw, err = reactor.NewPodWatcher(r.Cmd().Context(), kclientset, listOpts, params.Namespace())
if err != nil {
return err
}

r.pw.WithOnPodModifiedFn(r.onEvent)
// cannot defer with unlock up top because r.pw.Start() blocks; but the erroring out above kills the
// cli invocation, so it does not matter
r.watchLock.Unlock()
_, err = r.pw.Start()
_, err = r.pw.Start(listOpts)
return err
}

func (r *RunCommand) Log(msg string) {
// concurrent fmt.Fprintf(r.ioStream.Out...) calls need locking to avoid data races, as we 'write' to the stream
r.logLock.Lock()
defer r.logLock.Unlock()
fmt.Fprintf(r.ioStreams.Out, msg)
}

// runCmd instantiate the "build run" sub-command using common BuildRun flags.
func runCmd() runner.SubCommand {
cmd := &cobra.Command{
Expand All @@ -214,7 +266,7 @@ func runCmd() runner.SubCommand {
cmd: cmd,
buildRunSpec: flags.BuildRunSpecFromFlags(cmd.Flags()),
tailLogsStarted: make(map[string]bool),
watchLock: sync.Mutex{},
logLock: sync.Mutex{},
}
cmd.Flags().BoolVarP(&runCommand.follow, "follow", "F", runCommand.follow, "Start a build and watch its log until it completes or fails.")
return runCommand
Expand Down
83 changes: 48 additions & 35 deletions pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package build

import (
"runtime"
"bytes"
"strings"
"sync"
"testing"
"time"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
shpfake "github.com/shipwright-io/build/pkg/client/clientset/versioned/fake"
"github.com/shipwright-io/cli/pkg/shp/flags"
"github.com/shipwright-io/cli/pkg/shp/params"
"github.com/shipwright-io/cli/pkg/shp/reactor"
"github.com/spf13/cobra"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/fake"
fakekubetesting "k8s.io/client-go/testing"
Expand All @@ -26,6 +26,8 @@ func TestStartBuildRunFollowLog(t *testing.T) {
name string
phase corev1.PodPhase
logText string
to string
noPodYet bool
cancelled bool
brDeleted bool
podDeleted bool
Expand Down Expand Up @@ -76,6 +78,16 @@ func TestStartBuildRunFollowLog(t *testing.T) {
// k8s folks to "be careful" with it; fortunately, what we do for tail and pod_watcher so far is within
// the realm of reliable.
},
{
name: "timeout",
to: "1s",
logText: reactor.RequestTimeoutMessage,
},
{
name: "no pod yet",
noPodYet: true,
logText: "has not observed any pod events yet",
},
}

for _, test := range tests {
Expand Down Expand Up @@ -103,6 +115,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
},
}
shpclientset := shpfake.NewSimpleClientset()

// need this reactor since the Run method uses the ObjectMeta.GenerateName k8s feature to generate the random
// name for the BuildRun. However, for our purposes with unit testing, we want to control the name of the BuildRun
// to facilitate the list/selector via labels that is also employed by the Run method.
Expand All @@ -116,7 +129,10 @@ func TestStartBuildRunFollowLog(t *testing.T) {
return true, br, nil
}
shpclientset.PrependReactor("get", "buildruns", getReactorFunc)
kclientset := fake.NewSimpleClientset(pod)
kclientset := fake.NewSimpleClientset()
if !test.noPodYet {
kclientset = fake.NewSimpleClientset(pod)
}
ccmd := &cobra.Command{}
cmd := &RunCommand{
cmd: ccmd,
Expand All @@ -125,12 +141,16 @@ func TestStartBuildRunFollowLog(t *testing.T) {
follow: true,
shpClientset: shpclientset,
tailLogsStarted: make(map[string]bool),
watchLock: sync.Mutex{},
logLock: sync.Mutex{},
}

// set up context
cmd.Cmd().ExecuteC()
param := params.NewParamsForTest(kclientset, shpclientset, nil, metav1.NamespaceDefault)
pm := genericclioptions.NewConfigFlags(true)
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()

Expand All @@ -143,7 +163,13 @@ func TestStartBuildRunFollowLog(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{}
}

cmd.Complete(param, []string{name})
cmd.Complete(param, &ioStreams, []string{name})
if len(test.to) > 0 {
cmd.Run(param, &ioStreams)
checkLog(test.name, test.logText, cmd, out, t)
continue
}

go func() {
err := cmd.Run(param, &ioStreams)
if err != nil {
Expand All @@ -152,35 +178,22 @@ func TestStartBuildRunFollowLog(t *testing.T) {

}()

// yield the processor, so the initialization in Run can occur; afterward, the watchLock should allow
// coordination between Run and onEvent
runtime.Gosched()

// even with our release of the context above with Gosched(), repeated runs in CI have surfaced occasional timing issues between
// cmd.Run() finishing initialization and cmd.onEvent trying to used struct variables, resulting in panics; so we employ the lock here
// to insure the required initializations have run; this is still better than a generic "sleep log enough for
// the init to occur.
cmd.watchLock.Lock()
err := wait.PollImmediate(1*time.Second, 3*time.Second, func() (done bool, err error) {
// check any of the vars on RunCommand that are used in onEvent and make sure they are set;
// we are verifying the initialization done in Run() on RunCommand is complete
if cmd.pw != nil && cmd.ioStreams != nil && cmd.shpClientset != nil {
cmd.watchLock.Unlock()
return true, nil
}
return false, nil
})
if err != nil {
cmd.watchLock.Unlock()
t.Errorf("Run initialization did not complete in time")
}

// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
cmd.onEvent(pod)
if !strings.Contains(out.String(), test.logText) {
t.Errorf("test %s: unexpected output: %s", test.name, out.String())
if !test.noPodYet {
// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
cmd.onEvent(pod)
} else {
cmd.onNoPodEventsYet()
}
checkLog(test.name, test.logText, cmd, out, t)
}
}

func checkLog(name, text string, cmd *RunCommand, out *bytes.Buffer, t *testing.T) {
// need to employ log lock since accessing same iostream out used by Run cmd
cmd.logLock.Lock()
defer cmd.logLock.Unlock()
if !strings.Contains(out.String(), text) {
t.Errorf("test %s: unexpected output: %s", name, out.String())
}
}
2 changes: 1 addition & 1 deletion pkg/shp/cmd/buildrun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *CancelCommand) Cmd() *cobra.Command {
}

// Complete fills in data provided by user
func (c *CancelCommand) Complete(params *params.Params, args []string) error {
func (c *CancelCommand) Complete(params *params.Params, io *genericclioptions.IOStreams, args []string) error {
c.name = args[0]

return nil
Expand Down
Loading

0 comments on commit c91c9b7

Please sign in to comment.