Skip to content

Commit

Permalink
wait for killfunc completion when shutting down current app (#670)
Browse files Browse the repository at this point in the history
* wait for killfunc completion when shutting down current app

---------

Co-authored-by: jingdi.zhu <[email protected]>
  • Loading branch information
istyf and jingdi.zhu authored Jan 19, 2025
1 parent 4be5540 commit ad99709
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 65 deletions.
148 changes: 91 additions & 57 deletions runner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gohugoio/hugo/watcher/filenotify"
Expand All @@ -22,16 +23,17 @@ type Engine struct {
watcher filenotify.FileWatcher
debugMode bool
runArgs []string
running bool
running atomic.Bool

eventCh chan string
watcherStopCh chan bool
buildRunCh chan bool
buildRunStopCh chan bool
binStopCh chan bool
exitCh chan bool
// binStopCh is a channel for process termination control
// Type chan<- chan int indicates it's a send-only channel that transmits another channel(chan int)
binStopCh chan<- chan int
exitCh chan bool

procKillWg sync.WaitGroup
mu sync.RWMutex
watchers uint
fileChecksums *checksumMap
Expand All @@ -57,7 +59,6 @@ func NewEngineWithConfig(cfg *Config, debugMode bool) (*Engine, error) {
watcherStopCh: make(chan bool, 10),
buildRunCh: make(chan bool, 1),
buildRunStopCh: make(chan bool, 1),
binStopCh: make(chan bool),
exitCh: make(chan bool),
fileChecksums: &checksumMap{m: make(map[string]string)},
watchers: 0,
Expand Down Expand Up @@ -316,7 +317,7 @@ func (e *Engine) start() {
e.mainLog("Proxy server listening on http://localhost%s", e.proxy.server.Addr)
}

e.running = true
e.running.Store(true)
firstRunCh := make(chan bool, 1)
firstRunCh <- true

Expand Down Expand Up @@ -366,10 +367,8 @@ func (e *Engine) start() {
}

// if current app is running, stop it
e.withLock(func() {
close(e.binStopCh)
e.binStopCh = make(chan bool)
})
e.stopBin()

go e.buildRun()
}
}
Expand Down Expand Up @@ -479,43 +478,64 @@ func (e *Engine) runPostCmd() error {
}

func (e *Engine) runBin() error {
killFunc := func(cmd *exec.Cmd, stdout io.ReadCloser, stderr io.ReadCloser, killCh chan struct{}, processExit chan struct{}) {
select {
// listen to binStopCh
// cleanup() will close binStopCh when engine stop
// start() will close binStopCh when file changed
case <-e.binStopCh:
close(killCh)
break

// the process is exited, return
case <-processExit:
return
}
// killFunc returns a chan of chan of int that should be used to shutdown the bin currently being run
// The chan int that is passed in will be used to signal completion of the shutdown
killFunc := func(cmd *exec.Cmd, stdout io.ReadCloser, stderr io.ReadCloser, killCh chan<- struct{}, processExit <-chan struct{}) chan<- chan int {
shutdown := make(chan chan int)
var closer chan int

go func() {
defer func() {
stdout.Close()
stderr.Close()
}()

e.mainDebug("trying to kill pid %d, cmd %+v", cmd.Process.Pid, cmd.Args)
defer func() {
stdout.Close()
stderr.Close()
}()
pid, err := e.killCmd(cmd)
if err != nil {
e.mainDebug("failed to kill PID %d, error: %s", pid, err.Error())
if cmd.ProcessState != nil && !cmd.ProcessState.Exited() {
os.Exit(1)
}
} else {
e.mainDebug("cmd killed, pid: %d", pid)
}
if e.config.Build.StopOnError {
cmdBinPath := cmdPath(e.config.rel(e.config.binPath()))
if _, err = os.Stat(cmdBinPath); os.IsNotExist(err) {
select {
case closer = <-shutdown:
// stopBin has been called from start or cleanup
// defer the signalling of shutdown completion before attempting to kill further down
defer close(closer)
defer close(killCh)
case <-processExit:
// the process is exited, return
e.withLock(func() {
// Avoid deadlocking any racing shutdown request
select {
case c := <-shutdown:
close(c)
default:
}
e.binStopCh = nil
})
return
}
if err = os.Remove(cmdBinPath); err != nil {
e.mainLog("failed to remove %s, error: %s", e.config.rel(e.config.binPath()), err)

e.mainDebug("trying to kill pid %d, cmd %+v", cmd.Process.Pid, cmd.Args)

pid, err := e.killCmd(cmd)
if err != nil {
e.mainDebug("failed to kill PID %d, error: %s", pid, err.Error())
if cmd.ProcessState != nil && !cmd.ProcessState.Exited() {
// Pass a non zero exit code to the closer to delegate the
// decision wether to os.Exit or not
closer <- 1
}
} else {
e.mainDebug("cmd killed, pid: %d", pid)
}
}

if e.config.Build.StopOnError {
cmdBinPath := cmdPath(e.config.rel(e.config.binPath()))
if _, err = os.Stat(cmdBinPath); os.IsNotExist(err) {
return
}
if err = os.Remove(cmdBinPath); err != nil {
e.mainLog("failed to remove %s, error: %s", e.config.rel(e.config.binPath()), err)
}
}
}()

return shutdown
}

e.runnerLog("running...")
Expand All @@ -536,7 +556,6 @@ func (e *Engine) runBin() error {
case <-killCh:
return
default:
e.procKillWg.Add(1)
command := strings.Join(append([]string{e.config.Build.Bin}, e.runArgs...), " ")
cmd, stdout, stderr, err := e.startCmd(command)
if err != nil {
Expand All @@ -551,10 +570,9 @@ func (e *Engine) runBin() error {
e.proxy.Reload()
}

e.stopBin()
e.withLock(func() {
close(e.binStopCh)
e.binStopCh = make(chan bool)
go killFunc(cmd, stdout, stderr, killCh, processExit)
e.binStopCh = killFunc(cmd, stdout, stderr, killCh, processExit)
})

go func() {
Expand All @@ -568,6 +586,7 @@ func (e *Engine) runBin() error {
}()
state, _ := cmd.Process.Wait()
close(processExit)

switch state.ExitCode() {
case 0:
e.runnerLog("Process Exit with Code 0")
Expand All @@ -576,7 +595,6 @@ func (e *Engine) runBin() error {
default:
e.runnerLog("Process Exit with Code: %v", state.ExitCode())
}
e.procKillWg.Done()

if !e.config.Build.Rerun {
return
Expand All @@ -589,9 +607,31 @@ func (e *Engine) runBin() error {
return nil
}

func (e *Engine) stopBin() {
e.mainDebug("initiating shutdown sequence")
start := time.Now()
e.mainDebug("shutdown completed in %v", time.Since(start))

exitCode := make(chan int)

e.withLock(func() {
if e.binStopCh != nil {
e.mainDebug("sending shutdown command to killfunc")
e.binStopCh <- exitCode
e.binStopCh = nil
} else {
close(exitCode)
}
})
if ret := <-exitCode; ret != 0 {
os.Exit(ret)
}
}

func (e *Engine) cleanup() {
e.mainLog("cleaning...")
defer e.mainLog("see you again~")
defer e.mainDebug("exited")

if e.config.Proxy.Enabled {
e.mainDebug("powering down the proxy...")
Expand All @@ -600,11 +640,8 @@ func (e *Engine) cleanup() {
}
}

e.withLock(func() {
close(e.binStopCh)
e.binStopCh = make(chan bool)
})
e.mainDebug("waiting for close watchers..")
e.stopBin()
e.mainDebug("waiting for close watchers..")

e.withLock(func() {
for i := 0; i < int(e.watchers); i++ {
Expand All @@ -627,10 +664,7 @@ func (e *Engine) cleanup() {
}
}

e.mainDebug("waiting for exit...")
e.procKillWg.Wait()
e.running = false
e.mainDebug("exited")
e.running.Store(false)
}

// Stop the air
Expand Down
18 changes: 10 additions & 8 deletions runner/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestCtrlCWhenHaveKillDelay(t *testing.T) {
t.Fatalf("Should not be fail: %s.", err)
}
time.Sleep(time.Second * 3)
assert.False(t, engine.running)
assert.False(t, engine.running.Load())
}

func TestCtrlCWhenREngineIsRunning(t *testing.T) {
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestCtrlCWhenREngineIsRunning(t *testing.T) {
if err != nil {
t.Fatalf("Should not be fail: %s.", err)
}
assert.False(t, engine.running)
assert.False(t, engine.running.Load())
}

func TestCtrlCWithFailedBin(t *testing.T) {
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestFixCloseOfChannelAfterCtrlC(t *testing.T) {
if err := waitingPortConnectionRefused(t, port, time.Second*10); err != nil {
t.Fatalf("Should not be fail: %s.", err)
}
assert.False(t, engine.running)
assert.False(t, engine.running.Load())
}

func TestFixCloseOfChannelAfterTwoFailedBuild(t *testing.T) {
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestFixCloseOfChannelAfterTwoFailedBuild(t *testing.T) {
// ctrl + c
sigs <- syscall.SIGINT
time.Sleep(time.Second * 1)
assert.False(t, engine.running)
assert.False(t, engine.running.Load())
}

// waitingPortReady waits until the port is ready to be used.
Expand Down Expand Up @@ -867,15 +867,17 @@ func Test(t *testing.T) {
}
// check is MacOS
var cmd *exec.Cmd
toolName := "sed"

if runtime.GOOS == "darwin" {
cmd = exec.Command("gsed", "-i", "s/\"_test.*go\"//g", ".air.toml")
} else {
cmd = exec.Command("sed", "-i", "s/\"_test.*go\"//g", ".air.toml")
toolName = "gsed"
}

cmd = exec.Command(toolName, "-i", "s/\"_test.*go\"//g", ".air.toml")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
t.Fatal(err)
t.Skipf("unable to run %s, make sure the tool is installed to run this test", toolName)
}

time.Sleep(time.Second * 2)
Expand Down

0 comments on commit ad99709

Please sign in to comment.