Skip to content

Commit

Permalink
[Metrics] Add streaming support for metrics
Browse files Browse the repository at this point in the history
Address #178
  • Loading branch information
JeffLuoo committed Feb 20, 2025
1 parent 2577f63 commit 4249311
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 22 deletions.
29 changes: 29 additions & 0 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody(
loggerVerbose.Info("Model requested", "model", model)
modelName := model

// Resolve streaming options

streaming, ok := rb["stream"].(bool)
if !ok {
// streaming not set, no-op
} else {
reqCtx.Streaming = streaming
}

type Usage struct {
IncludeUsage string `json:"include_usage,omitempty"`
}
if streamOption, ok := rb["stream_options"]; ok {
includeUsage := Usage{}
optionJson, err := json.Marshal(streamOption)
if err != nil {
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
}
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
}
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
if err != nil {
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
}

reqCtx.StreamingIncludeUsage = usageEnabled
}

// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
// This might be a security risk in the future where adapters not registered in the InferenceModel
// are able to be requested by using their distinct name.
Expand Down
75 changes: 62 additions & 13 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand Down Expand Up @@ -76,6 +78,10 @@ func (s *Server) HandleResponseHeaders(
}
}

if h.ResponseHeaders.EndOfStream {
reqCtx.StreamingCompleted = true
loggerVerbose.Info("Header indicates streaming complete")
}
resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
Expand Down Expand Up @@ -132,22 +138,65 @@ func (s *Server) HandleResponseBody(
) (*extProcPb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
if reqCtx.Streaming {
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")

responseText := string(body.ResponseBody.Body)
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
//
// data: [DONE]
// Noticed that vLLM returns two entries in one response.
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
//
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
// indicates end of streaming.
if strings.Contains(responseText, "data: [DONE]") {
response := Response{}

if reqCtx.StreamingIncludeUsage {

re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
match := re.FindString(responseText)
if match == "" {
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
}
byteSlice := []byte(match)
if err := json.Unmarshal(byteSlice, &response); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
} else {
// ResponseBody.EndOfStream is only set if include_usage is set to true.
reqCtx.ResponseComplete = true
loggerVerbose.Info("Streaming is completed")
}

reqCtx.Response = response
}

if body.ResponseBody.EndOfStream {
loggerVerbose.Info("Streaming is completed")
reqCtx.ResponseComplete = true
} else {
reqCtx.ResponseSize += len(body.ResponseBody.Body)
}

} else {
loggerVerbose.Info("Processing HandleResponseBody")

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
reqCtx.ResponseComplete = true

loggerVerbose.Info("Response generated", "response", res)
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
// ResponseComplete is to indicate the response is complete. In non-streaming
// case, it will be set to be true once the response is processed; in
// streaming case, it will be set to be true once the last chunk is processed.
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
// will add the processing for streaming case.
reqCtx.ResponseComplete = true
loggerVerbose.Info("Response generated", "response", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
Expand Down
50 changes: 49 additions & 1 deletion pkg/epp/handlers/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ const (
}
}
`

streamingBodyWithoutUsage = `
data: data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
`

streamingBodyWithUsage = `
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]
`
)

func TestHandleResponseBody(t *testing.T) {
Expand All @@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) {
tests := []struct {
name string
req *extProcPb.ProcessingRequest_ResponseBody
reqCtx *RequestContext
want Response
wantErr bool
}{
Expand Down Expand Up @@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) {
},
wantErr: true,
},
{
name: "streaming request without usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithoutUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
StreamingIncludeUsage: false,
},
wantErr: false,
// In the middle of streaming response, so request context reponse is not set yet.
},
{
name: "streaming request with usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
StreamingIncludeUsage: true,
},
wantErr: false,
want: Response{
Usage: Usage{
PromptTokens: 7,
TotalTokens: 17,
CompletionTokens: 10,
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := &Server{}
reqCtx := &RequestContext{}
reqCtx := test.reqCtx
if reqCtx == nil {
reqCtx = &RequestContext{}
}
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
if err != nil {
if !test.wantErr {
Expand Down
15 changes: 13 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
}
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
if reqCtx.Streaming {
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
} else {
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
}
default:
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
return status.Error(codes.Unknown, "unknown request type")
Expand Down Expand Up @@ -179,7 +183,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
}
}

loggerVerbose.Info("Response generated", "response", resp)
if !reqCtx.Streaming {
loggerVerbose.Info("Response generated", "response", resp)
} else {
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
}
if err := srv.Send(resp); err != nil {
logger.V(logutil.DEFAULT).Error(err, "Send failed")
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
Expand All @@ -200,4 +208,7 @@ type RequestContext struct {
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
Streaming bool
StreamingCompleted bool
StreamingIncludeUsage bool
}
41 changes: 35 additions & 6 deletions pkg/epp/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,29 @@ This documentation is the current state of exposed metrics.

## Requirements

Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:

Currently there are two options:
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.

- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
```
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
metadata:
name: ext-proc-policy
namespace: default
spec:
extProc:
- backendRefs:
- group: ""
kind: Service
name: inference-gateway-ext-proc
port: 9002
processingMode:
request:
body: Buffered
response:
body: Buffered
```

For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:

```
apiVersion: gateway.envoyproxy.io/v1alpha1
Expand All @@ -33,7 +49,20 @@ spec:
request:
body: Buffered
response:
body: Buffered
body: Streamed
```

If you want to include usage metrics for vLLM model server, send the request with `include_usage`:

```
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
"model": "tweet-summary",
"prompt": "whats your fav movie?",
"max_tokens": 10,
"temperature": 0,
"stream": true,
"stream_options": {"include_usage": "true"}
}'
```

## Exposed metrics
Expand Down

0 comments on commit 4249311

Please sign in to comment.