Skip to content

Commit

Permalink
container exec
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Lin committed Sep 7, 2018
1 parent 8d2f93c commit 55af670
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 3 deletions.
54 changes: 54 additions & 0 deletions src/server/handler_shell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"net/http"
"os"
"path/filepath"

response "github.com/linkernetworks/vortex/src/net/http"
"github.com/linkernetworks/vortex/src/web"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)

// TerminalResponse is sent by handleExecShell. The Id is a random session id that binds the original REST request and the SockJS connection.
// Any clientapi in possession of this Id can hijack the terminal session.
type TerminalResponse struct {
Id string `json:"id"`
}

// Handles execute shell API call
// func handleExecShell(request *restful.Request, response *restful.Response) {
func handleExecShell(ctx *web.Context) {
sp, req, resp := ctx.ServiceProvider, ctx.Request, ctx.Response
sessionId, err := genTerminalSessionId()
if err != nil {
response.InternalServerError(req.Request, resp.ResponseWriter, err)
return
}

kubeconfigPath := filepath.Join(os.Getenv("HOME"), ".kube", "config")
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)

terminalSessions[sessionId] = TerminalSession{
id: sessionId,
bound: make(chan error),
sizeChan: make(chan remotecommand.TerminalSize),
}
go WaitForTerminal(sp.KubeCtl.Clientset, cfg, req, sessionId)
resp.WriteHeaderAndEntity(http.StatusOK, TerminalResponse{Id: sessionId})
}
280 changes: 280 additions & 0 deletions src/server/handler_terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"

restful "github.com/emicklei/go-restful"
"github.com/linkernetworks/logger"
"gopkg.in/igm/sockjs-go.v2/sockjs"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

// PtyHandler is what remotecommand expects from a pty
type PtyHandler interface {
io.Reader
io.Writer
remotecommand.TerminalSizeQueue
}

// TerminalSession implements PtyHandler (using a SockJS connection)
type TerminalSession struct {
id string
bound chan error
sockJSSession sockjs.Session
sizeChan chan remotecommand.TerminalSize
}

// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
//
// OP DIRECTION FIELD(S) USED DESCRIPTION
// ---------------------------------------------------------------------
// bind fe->be SessionID Id sent back from TerminalResponse
// stdin fe->be Data Keystrokes/paste buffer
// resize fe->be Rows, Cols New terminal size
// stdout be->fe Data Output from the process
// toast be->fe Data OOB message to be shown to the user
type TerminalMessage struct {
Op, Data, SessionID string
Rows, Cols uint16
}

// TerminalSize handles pty->process resize events
// Called in a loop from remotecommand as long as the process is running
func (t TerminalSession) Next() *remotecommand.TerminalSize {
select {
case size := <-t.sizeChan:
return &size
}
}

// Read handles pty->process messages (stdin, resize)
// Called in a loop from remotecommand as long as the process is running
func (t TerminalSession) Read(p []byte) (int, error) {
m, err := t.sockJSSession.Recv()
if err != nil {
return 0, err
}

var msg TerminalMessage
if err := json.Unmarshal([]byte(m), &msg); err != nil {
return 0, err
}

switch msg.Op {
case "stdin":
return copy(p, msg.Data), nil
case "resize":
t.sizeChan <- remotecommand.TerminalSize{Width: msg.Cols, Height: msg.Rows}
return 0, nil
default:
return 0, fmt.Errorf("unknown message type '%s'", msg.Op)
}
}

// Write handles process->pty stdout
// Called from remotecommand whenever there is any output
func (t TerminalSession) Write(p []byte) (int, error) {
msg, err := json.Marshal(TerminalMessage{
Op: "stdout",
Data: string(p),
})
if err != nil {
return 0, err
}

if err = t.sockJSSession.Send(string(msg)); err != nil {
return 0, err
}
return len(p), nil
}

// Toast can be used to send the user any OOB messages
// hterm puts these in the center of the terminal
func (t TerminalSession) Toast(p string) error {
msg, err := json.Marshal(TerminalMessage{
Op: "toast",
Data: p,
})
if err != nil {
return err
}

if err = t.sockJSSession.Send(string(msg)); err != nil {
return err
}
return nil
}

// Close shuts down the SockJS connection and sends the status code and reason to the client
// Can happen if the process exits or if there is an error starting up the process
// For now the status code is unused and reason is shown to the user (unless "")
func (t TerminalSession) Close(status uint32, reason string) {
t.sockJSSession.Close(status, reason)
}

// terminalSessions stores a map of all TerminalSession objects
// FIXME: this structure needs locking
var terminalSessions = make(map[string]TerminalSession)

// handleTerminalSession is Called by net/http for any new /api/sockjs connections
func handleTerminalSession(session sockjs.Session) {
var (
buf string
err error
msg TerminalMessage
terminalSession TerminalSession
ok bool
)

if buf, err = session.Recv(); err != nil {
logger.Infof("handleTerminalSession: can't Recv: %v", err)
return
}

if err = json.Unmarshal([]byte(buf), &msg); err != nil {
logger.Infof("handleTerminalSession: can't UnMarshal (%v): %s", err, buf)
return
}

if msg.Op != "bind" {
logger.Infof("handleTerminalSession: expected 'bind' message, got: %s", buf)
return
}

if terminalSession, ok = terminalSessions[msg.SessionID]; !ok {
logger.Infof("handleTerminalSession: can't find session '%s'", msg.SessionID)
return
}

terminalSession.sockJSSession = session
terminalSessions[msg.SessionID] = terminalSession
terminalSession.bound <- nil
}

// CreateAttachHandler is called from main for /api/sockjs
func CreateAttachHandler(path string) http.Handler {
return sockjs.NewHandler(path, sockjs.DefaultOptions, handleTerminalSession)
}

// startProcess is called by handleAttach
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, cmd []string, ptyHandler PtyHandler) error {
namespace := request.PathParameter("namespace")
podName := request.PathParameter("pod")
containerName := request.PathParameter("container")

req := k8sClient.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")

req.VersionedParams(&v1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL())
if err != nil {
return err
}

err = exec.Stream(remotecommand.StreamOptions{
Stdin: ptyHandler,
Stdout: ptyHandler,
Stderr: ptyHandler,
TerminalSizeQueue: ptyHandler,
Tty: true,
})
if err != nil {
return err
}

return nil
}

// genTerminalSessionId generates a random session ID string. The format is not really interesting.
// This ID is used to identify the session when the client opens the SockJS connection.
// Not the same as the SockJS session id! We can't use that as that is generated
// on the client side and we don't have it yet at this point.
func genTerminalSessionId() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
id := make([]byte, hex.EncodedLen(len(bytes)))
hex.Encode(id, bytes)
return string(id), nil
}

// isValidShell checks if the shell is an allowed one
func isValidShell(validShells []string, shell string) bool {
for _, validShell := range validShells {
if validShell == shell {
return true
}
}
return false
}

// WaitForTerminal is called from apihandler.handleAttach as a goroutine
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *restful.Request, sessionId string) {
shell := request.QueryParameter("shell")

select {
case <-terminalSessions[sessionId].bound:
close(terminalSessions[sessionId].bound)

var err error
validShells := []string{"bash", "sh", "powershell", "cmd"}

if isValidShell(validShells, shell) {
cmd := []string{shell}
err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId])
} else {
// No shell given or it was not valid: try some shells until one succeeds or all fail
// FIXME: if the first shell fails then the first keyboard event is lost
for _, testShell := range validShells {
cmd := []string{testShell}
if err = startProcess(k8sClient, cfg, request, cmd, terminalSessions[sessionId]); err == nil {
break
}
}
}

if err != nil {
terminalSessions[sessionId].Close(2, err.Error())
return
}

terminalSessions[sessionId].Close(1, "Process exited")
}
}
10 changes: 10 additions & 0 deletions src/server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ func (a *App) AppRoute() *mux.Router {
container.Add(newMonitoringService(a.ServiceProvider))
container.Add(newAppService(a.ServiceProvider))
container.Add(newOVSService(a.ServiceProvider))
container.Add(newShellService(a.ServiceProvider))

router.PathPrefix("/v1/sockjs").Handler(CreateAttachHandler("/v1/sockjs"))
router.PathPrefix("/v1/").Handler(container)

return router
}

Expand Down Expand Up @@ -186,3 +189,10 @@ func newOVSService(sp *serviceprovider.Container) *restful.WebService {
webService.Route(webService.GET("/portinfos").To(handler.RESTfulServiceHandler(sp, getOVSPortInfoHandler)))
return webService
}

func newShellService(sp *serviceprovider.Container) *restful.WebService {
webService := new(restful.WebService)
webService.Path("/v1/exec").Consumes(restful.MIME_JSON, restful.MIME_JSON).Produces(restful.MIME_JSON, restful.MIME_JSON)
webService.Route(webService.GET("/pod/{namespace}/{pod}/shell/{container}").To(handler.RESTfulServiceHandler(sp, handleExecShell)))
return webService
}
Loading

0 comments on commit 55af670

Please sign in to comment.