From fb848e9f57ddf68ae4ebf8941a47d8b1573ed8a4 Mon Sep 17 00:00:00 2001 From: John-Lin Date: Fri, 17 Aug 2018 17:47:12 +0800 Subject: [PATCH 1/3] container exec --- src/server/handler_shell.go | 54 +++++++ src/server/handler_terminal.go | 280 +++++++++++++++++++++++++++++++++ src/server/route.go | 10 ++ vendor/vendor.json | 72 ++++++++- 4 files changed, 413 insertions(+), 3 deletions(-) create mode 100644 src/server/handler_shell.go create mode 100644 src/server/handler_terminal.go diff --git a/src/server/handler_shell.go b/src/server/handler_shell.go new file mode 100644 index 00000000..7d81327e --- /dev/null +++ b/src/server/handler_shell.go @@ -0,0 +1,54 @@ +// Copyright 2017 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + "os" + "path/filepath" + + response "github.com/linkernetworks/vortex/src/net/http" + "github.com/linkernetworks/vortex/src/web" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" +) + +// TerminalResponse is sent by handleExecShell. The Id is a random session id that binds the original REST request and the SockJS connection. +// Any clientapi in possession of this Id can hijack the terminal session. +type TerminalResponse struct { + Id string `json:"id"` +} + +// Handles execute shell API call +// func handleExecShell(request *restful.Request, response *restful.Response) { +func handleExecShell(ctx *web.Context) { + sp, req, resp := ctx.ServiceProvider, ctx.Request, ctx.Response + sessionId, err := genTerminalSessionId() + if err != nil { + response.InternalServerError(req.Request, resp.ResponseWriter, err) + return + } + + kubeconfigPath := filepath.Join(os.Getenv("HOME"), ".kube", "config") + cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + + terminalSessions[sessionId] = TerminalSession{ + id: sessionId, + bound: make(chan error), + sizeChan: make(chan remotecommand.TerminalSize), + } + go WaitForTerminal(sp.KubeCtl.Clientset, cfg, req, sessionId) + resp.WriteHeaderAndEntity(http.StatusOK, TerminalResponse{Id: sessionId}) +} diff --git a/src/server/handler_terminal.go b/src/server/handler_terminal.go new file mode 100644 index 00000000..6b698129 --- /dev/null +++ b/src/server/handler_terminal.go @@ -0,0 +1,280 @@ +// Copyright 2017 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + + restful "github.com/emicklei/go-restful" + "github.com/linkernetworks/logger" + "gopkg.in/igm/sockjs-go.v2/sockjs" + "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// PtyHandler is what remotecommand expects from a pty +type PtyHandler interface { + io.Reader + io.Writer + remotecommand.TerminalSizeQueue +} + +// TerminalSession implements PtyHandler (using a SockJS connection) +type TerminalSession struct { + id string + bound chan error + sockJSSession sockjs.Session + sizeChan chan remotecommand.TerminalSize +} + +// TerminalMessage is the messaging protocol between ShellController and TerminalSession. +// +// OP DIRECTION FIELD(S) USED DESCRIPTION +// --------------------------------------------------------------------- +// bind fe->be SessionID Id sent back from TerminalResponse +// stdin fe->be Data Keystrokes/paste buffer +// resize fe->be Rows, Cols New terminal size +// stdout be->fe Data Output from the process +// toast be->fe Data OOB message to be shown to the user +type TerminalMessage struct { + Op, Data, SessionID string + Rows, Cols uint16 +} + +// TerminalSize handles pty->process resize events +// Called in a loop from remotecommand as long as the process is running +func (t TerminalSession) Next() *remotecommand.TerminalSize { + select { + case size := <-t.sizeChan: + return &size + } +} + +// Read handles pty->process messages (stdin, resize) +// Called in a loop from remotecommand as long as the process is running +func (t TerminalSession) Read(p []byte) (int, error) { + m, err := t.sockJSSession.Recv() + if err != nil { + return 0, err + } + + var msg TerminalMessage + if err := json.Unmarshal([]byte(m), &msg); err != nil { + return 0, err + } + + switch msg.Op { + case "stdin": + return copy(p, msg.Data), nil + case "resize": + t.sizeChan <- remotecommand.TerminalSize{Width: msg.Cols, Height: msg.Rows} + return 0, nil + default: + return 0, fmt.Errorf("unknown message type '%s'", msg.Op) + } +} + +// Write handles process->pty stdout +// Called from remotecommand whenever there is any output +func (t TerminalSession) Write(p []byte) (int, error) { + msg, err := json.Marshal(TerminalMessage{ + Op: "stdout", + Data: string(p), + }) + if err != nil { + return 0, err + } + + if err = t.sockJSSession.Send(string(msg)); err != nil { + return 0, err + } + return len(p), nil +} + +// Toast can be used to send the user any OOB messages +// hterm puts these in the center of the terminal +func (t TerminalSession) Toast(p string) error { + msg, err := json.Marshal(TerminalMessage{ + Op: "toast", + Data: p, + }) + if err != nil { + return err + } + + if err = t.sockJSSession.Send(string(msg)); err != nil { + return err + } + return nil +} + +// Close shuts down the SockJS connection and sends the status code and reason to the client +// Can happen if the process exits or if there is an error starting up the process +// For now the status code is unused and reason is shown to the user (unless "") +func (t TerminalSession) Close(status uint32, reason string) { + t.sockJSSession.Close(status, reason) +} + +// terminalSessions stores a map of all TerminalSession objects +// FIXME: this structure needs locking +var terminalSessions = make(map[string]TerminalSession) + +// handleTerminalSession is Called by net/http for any new /api/sockjs connections +func handleTerminalSession(session sockjs.Session) { + var ( + buf string + err error + msg TerminalMessage + terminalSession TerminalSession + ok bool + ) + + if buf, err = session.Recv(); err != nil { + logger.Infof("handleTerminalSession: can't Recv: %v", err) + return + } + + if err = json.Unmarshal([]byte(buf), &msg); err != nil { + logger.Infof("handleTerminalSession: can't UnMarshal (%v): %s", err, buf) + return + } + + if msg.Op != "bind" { + logger.Infof("handleTerminalSession: expected 'bind' message, got: %s", buf) + return + } + + if terminalSession, ok = terminalSessions[msg.SessionID]; !ok { + logger.Infof("handleTerminalSession: can't find session '%s'", msg.SessionID) + return + } + + terminalSession.sockJSSession = session + terminalSessions[msg.SessionID] = terminalSession + terminalSession.bound <- nil +} + +// CreateAttachHandler is called from main for /api/sockjs +func CreateAttachHandler(path string) http.Handler { + return sockjs.NewHandler(path, sockjs.DefaultOptions, handleTerminalSession) +} + +// startProcess is called by handleAttach +// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session) +func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, cmd []string, ptyHandler PtyHandler) error { + namespace := request.PathParameter("namespace") + podName := request.PathParameter("pod") + containerName := request.PathParameter("container") + + req := k8sClient.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec") + + req.VersionedParams(&v1.PodExecOptions{ + Container: containerName, + Command: cmd, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) + if err != nil { + return err + } + + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: ptyHandler, + Stdout: ptyHandler, + Stderr: ptyHandler, + TerminalSizeQueue: ptyHandler, + Tty: true, + }) + if err != nil { + return err + } + + return nil +} + +// genTerminalSessionId generates a random session ID string. The format is not really interesting. +// This ID is used to identify the session when the client opens the SockJS connection. +// Not the same as the SockJS session id! We can't use that as that is generated +// on the client side and we don't have it yet at this point. +func genTerminalSessionId() (string, error) { + bytes := make([]byte, 16) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + id := make([]byte, hex.EncodedLen(len(bytes))) + hex.Encode(id, bytes) + return string(id), nil +} + +// isValidShell checks if the shell is an allowed one +func isValidShell(validShells []string, shell string) bool { + for _, validShell := range validShells { + if validShell == shell { + return true + } + } + return false +} + +// WaitForTerminal is called from apihandler.handleAttach as a goroutine +// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession +func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, sessionId string) { + shell := request.QueryParameter("shell") + + select { + case <-terminalSessions[sessionId].bound: + close(terminalSessions[sessionId].bound) + + var err error + validShells := []string{"bash", "sh", "powershell", "cmd"} + + if isValidShell(validShells, shell) { + cmd := []string{shell} + err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId]) + } else { + // No shell given or it was not valid: try some shells until one succeeds or all fail + // FIXME: if the first shell fails then the first keyboard event is lost + for _, testShell := range validShells { + cmd := []string{testShell} + if err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId]); err == nil { + break + } + } + } + + if err != nil { + terminalSessions[sessionId].Close(2, err.Error()) + return + } + + terminalSessions[sessionId].Close(1, "Process exited") + } +} diff --git a/src/server/route.go b/src/server/route.go index 27a28962..67c09937 100644 --- a/src/server/route.go +++ b/src/server/route.go @@ -29,8 +29,11 @@ func (a *App) AppRoute() *mux.Router { container.Add(newMonitoringService(a.ServiceProvider)) container.Add(newAppService(a.ServiceProvider)) container.Add(newOVSService(a.ServiceProvider)) + container.Add(newShellService(a.ServiceProvider)) + router.PathPrefix("/v1/sockjs").Handler(CreateAttachHandler("/v1/sockjs")) router.PathPrefix("/v1/").Handler(container) + return router } @@ -186,3 +189,10 @@ func newOVSService(sp *serviceprovider.Container) *restful.WebService { webService.Route(webService.GET("/portinfos").To(handler.RESTfulServiceHandler(sp, getOVSPortInfoHandler))) return webService } + +func newShellService(sp *serviceprovider.Container) *restful.WebService { + webService := new(restful.WebService) + webService.Path("/v1/exec").Consumes(restful.MIME_JSON, restful.MIME_JSON).Produces(restful.MIME_JSON, restful.MIME_JSON) + webService.Route(webService.GET("/pod/{namespace}/{pod}/shell/{container}").To(handler.RESTfulServiceHandler(sp, handleExecShell))) + return webService +} diff --git a/vendor/vendor.json b/vendor/vendor.json index e16c65a6..5cc6b67e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -68,6 +68,18 @@ "revision": "0b96aaa707760d6ab28d9b9d1913ff5993328bae", "revisionTime": "2018-07-19T21:18:23Z" }, + { + "checksumSHA1": "5Z0J6IIV39bPOf0NOVHLolGwcfo=", + "path": "github.com/docker/spdystream", + "revision": "bc6354cbbc295e925e4c611ffe90c1f287ee54db", + "revisionTime": "2017-09-12T18:36:27Z" + }, + { + "checksumSHA1": "Gaf5SDyC0xV32LdjPCyUGvH5BhI=", + "path": "github.com/docker/spdystream/spdy", + "revision": "bc6354cbbc295e925e4c611ffe90c1f287ee54db", + "revisionTime": "2017-09-12T18:36:27Z" + }, { "checksumSHA1": "wPbKObbGzS/43nrskRaJVFVEW/A=", "path": "github.com/ema/qdisc", @@ -206,6 +218,12 @@ "revision": "cb4698366aa625048f3b815af6a0dea8aef9280a", "revisionTime": "2018-06-05T21:15:56Z" }, + { + "checksumSHA1": "EqroMGIdQ3z+qyzaYmrOJ7fNi6o=", + "path": "github.com/gorilla/websocket", + "revision": "3ff3320c2a1756a3691521efc290b4701575147c", + "revisionTime": "2018-08-16T22:18:03Z" + }, { "checksumSHA1": "T1uAtgdoQP82t1GaT1eZjhYmFmM=", "path": "github.com/gregjones/httpcache", @@ -862,6 +880,12 @@ "revision": "ab2a8a99087e827c9af87ed6777ba897348fb178", "revisionTime": "2018-07-09T15:35:10Z" }, + { + "checksumSHA1": "ay9WnRs0b3RGtKVTEGvKvfvfojs=", + "path": "gopkg.in/igm/sockjs-go.v2/sockjs", + "revision": "d276e9ffe5cc5c271b81198cc77a2adf6c4482d2", + "revisionTime": "2016-11-15T19:38:03Z" + }, { "checksumSHA1": "COfXAfInbcFT/YRsvLUQnNKHzF0=", "path": "gopkg.in/inf.v0", @@ -1220,6 +1244,18 @@ "revision": "5a8013207d0d28c7fe98193e5b6cdbf92e98a000", "revisionTime": "2018-06-14T22:41:26Z" }, + { + "checksumSHA1": "FR8QEDIjGsSdUX4rXVlGP18uSyA=", + "path": "k8s.io/apimachinery/pkg/util/httpstream", + "revision": "ac4f7e0decfc7dbc613748c40f1bc938da71853b", + "revisionTime": "2018-08-17T04:17:15Z" + }, + { + "checksumSHA1": "IMjQA9VgWXGfadze+u8jNUUZZlw=", + "path": "k8s.io/apimachinery/pkg/util/httpstream/spdy", + "revision": "ac4f7e0decfc7dbc613748c40f1bc938da71853b", + "revisionTime": "2018-08-17T04:17:15Z" + }, { "checksumSHA1": "9nShqMfXfw5Ct0Rr7P2QhQObVhw=", "path": "k8s.io/apimachinery/pkg/util/intstr", @@ -1244,6 +1280,12 @@ "revision": "5a8013207d0d28c7fe98193e5b6cdbf92e98a000", "revisionTime": "2018-06-14T22:41:26Z" }, + { + "checksumSHA1": "PvefNiZTc/uwwPAOkBKZ3ST25Tc=", + "path": "k8s.io/apimachinery/pkg/util/remotecommand", + "revision": "ac4f7e0decfc7dbc613748c40f1bc938da71853b", + "revisionTime": "2018-08-17T04:17:15Z" + }, { "checksumSHA1": "keWEyQHGKGkDo2SARgphbGgN608=", "path": "k8s.io/apimachinery/pkg/util/runtime", @@ -1304,6 +1346,12 @@ "revision": "da954875f3efabca13c924dd99264f7fb2cfa422", "revisionTime": "2018-06-20T21:21:21Z" }, + { + "checksumSHA1": "OGbsdDRP+y4F++eGe/dh3FIWThg=", + "path": "k8s.io/apimachinery/third_party/forked/golang/netutil", + "revision": "ac4f7e0decfc7dbc613748c40f1bc938da71853b", + "revisionTime": "2018-08-17T04:17:15Z" + }, { "checksumSHA1": "9sFA+EjKrjpmK4OofQH0p0Rowfg=", "path": "k8s.io/apimachinery/third_party/forked/golang/reflect", @@ -1719,10 +1767,10 @@ "revisionTime": "2018-06-14T22:41:26Z" }, { - "checksumSHA1": "WAtl5P1bup5IS4is2kEc5WBN3YU=", + "checksumSHA1": "o+2OSOou88LEeEXIswG0IOlq2TQ=", "path": "k8s.io/client-go/rest", - "revision": "8d6e3480fc03b7337a24f349d35733190655e2ad", - "revisionTime": "2018-06-14T22:41:26Z" + "revision": "b95fa2e4f33e05873e8429097cd1d451439538ad", + "revisionTime": "2018-08-15T11:23:08Z" }, { "checksumSHA1": "+wYCwQaVc1GvMWwOWc3wXd2GT5s=", @@ -1778,12 +1826,24 @@ "revision": "8d6e3480fc03b7337a24f349d35733190655e2ad", "revisionTime": "2018-06-14T22:41:26Z" }, + { + "checksumSHA1": "jyMWw2kAEVH7/brD2gLVVf3vxjM=", + "path": "k8s.io/client-go/tools/remotecommand", + "revision": "b95fa2e4f33e05873e8429097cd1d451439538ad", + "revisionTime": "2018-08-15T11:23:08Z" + }, { "checksumSHA1": "xjOr+rKimhz7M8LySdtXK0xX7cs=", "path": "k8s.io/client-go/transport", "revision": "8d6e3480fc03b7337a24f349d35733190655e2ad", "revisionTime": "2018-06-14T22:41:26Z" }, + { + "checksumSHA1": "z6ktalStv2wJypDrcFFaSpfWqNg=", + "path": "k8s.io/client-go/transport/spdy", + "revision": "b95fa2e4f33e05873e8429097cd1d451439538ad", + "revisionTime": "2018-08-15T11:23:08Z" + }, { "checksumSHA1": "rIxDMrsqcf1cjy3dgMaEZw/TvJs=", "path": "k8s.io/client-go/util/cert", @@ -1796,6 +1856,12 @@ "revision": "8d6e3480fc03b7337a24f349d35733190655e2ad", "revisionTime": "2018-06-14T22:41:26Z" }, + { + "checksumSHA1": "vOdL2QO08zL5IS+UBOVJMGwgbFI=", + "path": "k8s.io/client-go/util/exec", + "revision": "b95fa2e4f33e05873e8429097cd1d451439538ad", + "revisionTime": "2018-08-15T11:23:08Z" + }, { "checksumSHA1": "yXKT7cJNCv5vjwGKhpc/DUsQg+A=", "path": "k8s.io/client-go/util/flowcontrol", From 7e74f88f70ffa219de29019d55972ae1a066142e Mon Sep 17 00:00:00 2001 From: Pei Hsuan Tsai Date: Mon, 10 Sep 2018 16:30:40 +0800 Subject: [PATCH 2/3] read the cluster config object --- src/server/handler_shell.go | 9 +++++++++ src/server/handler_terminal.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/server/handler_shell.go b/src/server/handler_shell.go index 7d81327e..cf6fbbe8 100644 --- a/src/server/handler_shell.go +++ b/src/server/handler_shell.go @@ -15,12 +15,14 @@ package server import ( + "fmt" "net/http" "os" "path/filepath" response "github.com/linkernetworks/vortex/src/net/http" "github.com/linkernetworks/vortex/src/web" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/remotecommand" ) @@ -44,6 +46,13 @@ func handleExecShell(ctx *web.Context) { kubeconfigPath := filepath.Join(os.Getenv("HOME"), ".kube", "config") cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + cfg, err = rest.InClusterConfig() + if err != nil { + panic(fmt.Errorf("Load the kubernetes config fail")) + } + } + terminalSessions[sessionId] = TerminalSession{ id: sessionId, bound: make(chan error), diff --git a/src/server/handler_terminal.go b/src/server/handler_terminal.go index 6b698129..4fc6dbdc 100644 --- a/src/server/handler_terminal.go +++ b/src/server/handler_terminal.go @@ -174,7 +174,7 @@ func handleTerminalSession(session sockjs.Session) { terminalSession.bound <- nil } -// CreateAttachHandler is called from main for /api/sockjs +// CreateAttachHandler is called from main for /v1/sockjs func CreateAttachHandler(path string) http.Handler { return sockjs.NewHandler(path, sockjs.DefaultOptions, handleTerminalSession) } From c1d8a3b0c636793578ba1abd60d3573b607d0952 Mon Sep 17 00:00:00 2001 From: Pei Hsuan Tsai Date: Mon, 10 Sep 2018 17:34:17 +0800 Subject: [PATCH 3/3] put clusterConfig into serviceProvider container object --- src/server/handler_shell.go | 17 +---------------- src/serviceprovider/serviceprovider.go | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/src/server/handler_shell.go b/src/server/handler_shell.go index cf6fbbe8..9ae55665 100644 --- a/src/server/handler_shell.go +++ b/src/server/handler_shell.go @@ -15,15 +15,10 @@ package server import ( - "fmt" "net/http" - "os" - "path/filepath" response "github.com/linkernetworks/vortex/src/net/http" "github.com/linkernetworks/vortex/src/web" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/remotecommand" ) @@ -43,21 +38,11 @@ func handleExecShell(ctx *web.Context) { return } - kubeconfigPath := filepath.Join(os.Getenv("HOME"), ".kube", "config") - cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) - - if err != nil { - cfg, err = rest.InClusterConfig() - if err != nil { - panic(fmt.Errorf("Load the kubernetes config fail")) - } - } - terminalSessions[sessionId] = TerminalSession{ id: sessionId, bound: make(chan error), sizeChan: make(chan remotecommand.TerminalSize), } - go WaitForTerminal(sp.KubeCtl.Clientset, cfg, req, sessionId) + go WaitForTerminal(sp.KubeCtl.Clientset, sp.ClusterConfig, req, sessionId) resp.WriteHeaderAndEntity(http.StatusOK, TerminalResponse{Id: sessionId}) } diff --git a/src/serviceprovider/serviceprovider.go b/src/serviceprovider/serviceprovider.go index a6ef2a22..d370118d 100644 --- a/src/serviceprovider/serviceprovider.go +++ b/src/serviceprovider/serviceprovider.go @@ -21,11 +21,12 @@ import ( // Container is the structure for container type Container struct { - Config config.Config - Mongo *mongo.Service - Prometheus *prometheusprovider.Service - KubeCtl *kubeCtl.KubeCtl - Validator *validator.Validate + Config config.Config + ClusterConfig *rest.Config + Mongo *mongo.Service + Prometheus *prometheusprovider.Service + KubeCtl *kubeCtl.KubeCtl + Validator *validator.Validate } // ServiceDiscoverResponse is the structure for Service Discover Response @@ -63,11 +64,12 @@ func New(cf config.Config) *Container { validate.RegisterValidation("k8sname", checkNameValidation) sp := &Container{ - Config: cf, - Mongo: mongo, - Prometheus: prometheus, - KubeCtl: kubeCtl.New(clientset), - Validator: validate, + Config: cf, + ClusterConfig: k8s, + Mongo: mongo, + Prometheus: prometheus, + KubeCtl: kubeCtl.New(clientset), + Validator: validate, } if err := createDefaultUser(sp.Mongo); err != nil {