Skip to content

Commit

Permalink
feat(webserver): add UnaryHandler which returns a unary server handle…
Browse files Browse the repository at this point in the history
…r for [http handler] --> [grpc server handler] without grpc client dials to grpc server.

gRPC-Gateway does not support gRPC interceptors when call gRPC's service handler in process.
See: grpc-ecosystem/grpc-gateway#1043
  • Loading branch information
searKing committed Oct 10, 2024
1 parent 10246b8 commit 9560961
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 53 deletions.
8 changes: 3 additions & 5 deletions pkg/webserver/pkg/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
slog_ "github.com/searKing/golang/go/log/slog"
"github.com/searKing/golang/pkg/webserver/pkg/logging"
grpc_ "github.com/searKing/golang/third_party/github.com/grpc-ecosystem/grpc-gateway-v2/grpc"
)

// UnaryServerInterceptor returns a new unary server interceptor that performs recovering from a panic.
Expand All @@ -31,15 +30,14 @@ func StreamServerInterceptor() grpc.StreamServerInterceptor {
return grpcrecovery.StreamServerInterceptor(grpcrecovery.WithRecoveryHandlerContext(recoveryLogHandler))
}

// WrapRecovery returns a new unary server interceptor that performs recovering from a panic.
func WrapRecovery[REQ any, RESP any](handler grpc_.UnaryHandler[REQ, RESP]) grpc_.UnaryHandler[REQ, RESP] {
return func(ctx context.Context, req REQ) (_ RESP, err error) {
// UnaryHandler returns a new unary server handler that performs recovering from a panic.
func UnaryHandler(handler grpc.UnaryHandler) grpc.UnaryHandler {
return func(ctx context.Context, req any) (_ any, err error) {
defer func() {
if r := recover(); r != nil {
err = recoveryLogHandler(ctx, r)
}
}()

resp, err := handler(ctx, req)
return resp, err
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/webserver/webserver.connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package webserver

import (
"github.com/searKing/golang/pkg/webserver/pkg/stats"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func (f *Factory) ServerOptions(opts ...grpc.ServerOption) []grpc.ServerOption {
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc.MaxRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes))
} else {
opts = append(opts, grpc.MaxRecvMsgSize(defaultMaxReceiveMessageSize))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc.StatsHandler(&stats.ServerHandler{}))
}
return opts
}

func (f *Factory) DialOptions(opts ...grpc.DialOption) []grpc.DialOption {
if f.fc.NoGrpcProxy {
opts = append(opts, grpc.WithNoProxy())
}
if !f.fc.ForceDisableTls {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes), grpc.MaxCallSendMsgSize(f.fc.MaxReceiveMessageSizeInBytes)))
} else {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultMaxReceiveMessageSize), grpc.MaxCallSendMsgSize(defaultMaxSendMessageSize)))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc.WithStatsHandler(&stats.ClientHandler{}))
}

return opts
}
62 changes: 14 additions & 48 deletions pkg/webserver/webserver.factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ import (

"github.com/gin-gonic/gin"
"github.com/rs/cors"
"github.com/searKing/golang/pkg/webserver/pkg/stats"
"google.golang.org/grpc"

slog_ "github.com/searKing/golang/go/log/slog"
"github.com/searKing/golang/pkg/webserver/healthz"
"github.com/searKing/golang/pkg/webserver/pkg/recovery"
gin_ "github.com/searKing/golang/third_party/github.com/gin-gonic/gin"
grpc_ "github.com/searKing/golang/third_party/github.com/grpc-ecosystem/grpc-gateway-v2/grpc"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/burstlimit"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/timeoutlimit"
)

// ClientMaxReceiveMessageSize use 4GB as the default message size limit.
// grpc library default is 4MB
var defaultMaxReceiveMessageSize = math.MaxInt32 // 1024 * 1024 * 1024 * 4
var defaultMaxSendMessageSize = math.MaxInt32

// FactoryConfigFunc is an alias for a function that will take in a pointer to an FactoryConfig and modify it
type FactoryConfigFunc func(os *FactoryConfig) error

Expand Down Expand Up @@ -147,52 +146,24 @@ func (f *Factory) New() (*WebServer, error) {
}

opts := grpc_.WithDefault()
if f.fc.NoGrpcProxy {
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithNoProxy()))
}
{
// 设置GRPC最大消息大小
// connection options
// http -> grpc client -> grpc server
if f.fc.MaxReceiveMessageSizeInBytes > 0 {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxRecvMsgSize(f.fc.MaxReceiveMessageSizeInBytes)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(f.fc.MaxReceiveMessageSizeInBytes))))
} else {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxRecvMsgSize(defaultMaxReceiveMessageSize)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaultMaxReceiveMessageSize))))
}
// http <- grpc client <- grpc server
if f.fc.MaxSendMessageSizeInBytes > 0 {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxSendMsgSize(f.fc.MaxSendMessageSizeInBytes)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(f.fc.MaxSendMessageSizeInBytes))))
} else {
opts = append(opts, grpc_.WithGrpcServerOption(grpc.MaxSendMsgSize(defaultMaxSendMessageSize)))
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaultMaxSendMessageSize))))
}
opts = append(opts, grpc_.WithGrpcServerOption(f.ServerOptions()...))
opts = append(opts, grpc_.WithGrpcDialOption(f.DialOptions()...))
}
{
// recover
opts = append(opts, grpc_.WithGrpcUnaryServerChain(recovery.UnaryServerInterceptor()))
opts = append(opts, grpc_.WithGrpcStreamServerChain(recovery.StreamServerInterceptor()))
// grpc interceptors
opts = append(opts, grpc_.WithGrpcUnaryServerChain(f.UnaryServerInterceptors()...))
opts = append(opts, grpc_.WithGrpcStreamServerChain(f.StreamServerInterceptors()...))
}
{
// handle request timeout
opts = append(opts, grpc_.WithGrpcUnaryServerChain(timeoutlimit.UnaryServerInterceptor(f.fc.HandledTimeoutUnary)))
opts = append(opts, grpc_.WithGrpcStreamServerChain(timeoutlimit.StreamServerInterceptor(f.fc.HandledTimeoutStream)))
}
{
// burst limit
opts = append(opts, grpc_.WithGrpcUnaryServerChain(burstlimit.UnaryServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary)))
opts = append(opts, grpc_.WithGrpcStreamServerChain(burstlimit.StreamServerInterceptor(f.fc.MaxConcurrencyStream, f.fc.BurstLimitTimeoutStream)))
}
if f.fc.StatsHandling {
// log for the related stats handling (e.g., RPCs, connections).
opts = append(opts, grpc_.WithGrpcDialOption(grpc.WithStatsHandler(&stats.ClientHandler{})))
opts = append(opts, grpc_.WithGrpcServerOption(grpc.StatsHandler(&stats.ServerHandler{})))
// http interceptors
opts = append(opts, grpc_.WithHttpHandlerDecorators(f.HttpServerInterceptors()...))
}

// cors
opts = append(opts, grpc_.WithHttpWrapper(cors.New(f.fc.Cors).Handler))
opts = append(opts, f.fc.GatewayOptions...)
// log
opts = append(opts, grpc_.WithSlogLoggerConfig(slog.Default().Handler(), grpc_.ExtractLoggingOptions(opts...))...)
grpcBackend := grpc_.NewGatewayTLS(f.fc.BindAddress, f.fc.TlsConfig, opts...)
{
Expand Down Expand Up @@ -262,8 +233,3 @@ func (f *Factory) New() (*WebServer, error) {

return s, nil
}

// ClientMaxReceiveMessageSize use 4GB as the default message size limit.
// grpc library default is 4MB
var defaultMaxReceiveMessageSize = math.MaxInt32 // 1024 * 1024 * 4
var defaultMaxSendMessageSize = math.MaxInt32
52 changes: 52 additions & 0 deletions pkg/webserver/webserver.interceptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package webserver

import (
"github.com/rs/cors"
"google.golang.org/grpc"

http_ "github.com/searKing/golang/go/net/http"
"github.com/searKing/golang/pkg/webserver/pkg/recovery"
grpc_ "github.com/searKing/golang/third_party/google.golang.org/grpc"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/burstlimit"
"github.com/searKing/golang/third_party/google.golang.org/grpc/interceptors/timeoutlimit"
)

// UnaryHandler returns a new unary server handler.
//
// gRPC-Gateway does not support gRPC interceptors when call gRPC's service handler in process.
// See: https://github.com/grpc-ecosystem/grpc-gateway/issues/1043
func (f *Factory) UnaryHandler(handlers ...grpc_.UnaryHandlerDecorator) []grpc_.UnaryHandlerDecorator {
// recover
handlers = append(handlers, grpc_.UnaryHandlerDecoratorFunc(recovery.UnaryHandler))
return handlers
}

func (f *Factory) UnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) []grpc.UnaryServerInterceptor {
// recover
interceptors = append(interceptors, recovery.UnaryServerInterceptor())
// handle request timeout
interceptors = append(interceptors, timeoutlimit.UnaryServerInterceptor(f.fc.HandledTimeoutUnary))
// burst limit
interceptors = append(interceptors, burstlimit.UnaryServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary))
return interceptors
}

func (f *Factory) StreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) []grpc.StreamServerInterceptor {
// recover
interceptors = append(interceptors, recovery.StreamServerInterceptor())
// handle request timeout
interceptors = append(interceptors, timeoutlimit.StreamServerInterceptor(f.fc.HandledTimeoutUnary))
// burst limit
interceptors = append(interceptors, burstlimit.StreamServerInterceptor(f.fc.MaxConcurrencyUnary, f.fc.BurstLimitTimeoutUnary))
return interceptors
}

func (f *Factory) HttpServerInterceptors(decorators ...http_.HandlerDecorator) []http_.HandlerDecorator {
// cors
decorators = append(decorators, http_.HandlerDecoratorFunc(cors.New(f.fc.Cors).Handler))
return decorators
}

0 comments on commit 9560961

Please sign in to comment.