-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathserver.go
186 lines (160 loc) · 5.25 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright 2021-2024 Nokia
// Licensed under the BSD 3-Clause License.
// SPDX-License-Identifier: BSD-3-Clause
package restful
import (
"context"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/nokia/restful/trace/tracer"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
// Server represents a server instance.
type Server struct {
server *http.Server
serverMutex sync.Mutex
certFile string
keyFile string
graceful bool
restarting bool
gracePeriod time.Duration
monitors monitors
}
// ServerReadHeaderTimeout is the amount of time allowed to read request headers.
var ServerReadHeaderTimeout = 5 * time.Second
// ServerReadTimeout is the amount of time allowed to read request body.
// Default 60s is quite liberal.
var ServerReadTimeout = 60 * time.Second
// NewServer creates a new Server instance.
func NewServer() *Server {
server := Server{server: &http.Server{ReadHeaderTimeout: ServerReadHeaderTimeout, ReadTimeout: ServerReadTimeout}}
return &server
}
// Graceful enables graceful shutdown.
// Awaits TERM/INT signals and exits when http shutdown completed, i.e. clients are served.
// Caller may define gracePeriod to wait before shutting down listening point.
// Client connection shutdown awaited indefinitely.
func (s *Server) Graceful(gracePeriod time.Duration) *Server {
s.graceful = true
s.gracePeriod = gracePeriod
return s
}
// Addr sets address to listen on. E.g. ":8080".
// If not set then transport specific port (80/443) is listened on any interface.
func (s *Server) Addr(addr string) *Server {
s.server.Addr = addr
return s
}
// Monitor sets monitor functions for the server.
// These functions are called pre / post serving each request.
func (s *Server) Monitor(pre MonitorFuncPre, post MonitorFuncPost) *Server {
s.monitors.append(pre, post)
if s.server.Handler != nil {
s.server.Handler = s.monitors.wrap(s.server.Handler)
s.monitors = nil
}
return s
}
// Handler defines handlers for server.
// Logs, except for automatically served LivenessProbePath and HealthCheckPath.
func (s *Server) Handler(handler http.Handler) *Server {
if handler == nil {
DefaultServeMux.PathPrefix("/").HandlerFunc(http.DefaultServeMux.ServeHTTP) // In case http.HandleFunc() was used.
handler = DefaultServeMux
}
s.server.Handler = Logger(s.monitors.wrap(handler))
if isTraced && tracer.GetOTel() {
s.server.Handler = otelhttp.NewHandler(s.server.Handler, "", otelhttp.WithSpanNameFormatter(spanNameFormatter))
}
s.monitors = nil
return s
}
// ListenAndServe starts listening and serves requests, blocking the caller.
// Uses HTTPS if server key+cert is set, otherwise HTTP.
// Port is set according to scheme, if listening address is not set.
// When Graceful() is used it may return nil.
func (s *Server) ListenAndServe() error {
if !s.graceful {
return s.listenAndServe()
}
stopErrCh := make(chan error)
go func() {
if err := s.listenAndServe(); err != http.ErrServerClosed {
stopErrCh <- err
} else {
stopErrCh <- nil
}
}()
go waitForSignal(stopErrCh)
if err := <-stopErrCh; err != nil {
return err
}
if s.gracePeriod > 0 {
log.Debug("Waiting grace period: ", s.gracePeriod)
time.Sleep(s.gracePeriod) // Still accept new connections.
log.Debug("Grace period over")
} else {
time.Sleep(10 * time.Millisecond) // Clients just connected to be served. E.g. K8s endpoint just deleted.
}
log.Debug("Waiting client connections to shut down")
err := s.server.Shutdown(context.Background())
log.Debug("Shutdown completed")
return err
}
func waitForSignal(c chan error) {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGTERM, syscall.SIGINT)
log.Info("Signal received: ", <-signalChannel)
c <- nil
}
func (s *Server) listenAndServe() error {
if s.server.Handler == nil {
s.Handler(http.DefaultServeMux)
}
for {
var err error
if s.keyFile != "" && s.certFile != "" {
err = s.server.ListenAndServeTLS(s.certFile, s.keyFile)
} else {
err = s.server.ListenAndServe()
}
if !s.restarting {
return err
}
s.restarting = false
s.serverMutex.Lock() // ListenAndServe routines and Close are executed in parallel.
s.server = &http.Server{Handler: s.server.Handler, Addr: s.server.Addr, ReadHeaderTimeout: ServerReadHeaderTimeout, ReadTimeout: ServerReadTimeout}
s.serverMutex.Unlock()
}
}
// Restart restarts the server abruptly.
// During restart active connections are dropped and there may be an outage.
func (s *Server) Restart() {
s.restarting = true
if err := s.server.Close(); err != nil {
log.Errorf("restart close incomplete: %v", err)
}
}
// Close immediately closes all connections.
func (s *Server) Close() error {
s.serverMutex.Lock()
defer s.serverMutex.Unlock()
return s.server.Close()
}
// Shutdown closes all connections gracefully.
// E.g. server.Shutdown(context.Background())
func (s *Server) Shutdown(ctx context.Context) error {
s.serverMutex.Lock()
defer s.serverMutex.Unlock()
return s.server.Shutdown(ctx)
}
// ListenAndServe acts like standard http.ListenAndServe().
// Logs, except for automatically served LivenessProbePath and HealthCheckPath.
func ListenAndServe(addr string, handler http.Handler) error {
return NewServer().Addr(addr).Handler(handler).ListenAndServe()
}