Skip to content

Commit

Permalink
feat: trigger controller wait eventbus ready
Browse files Browse the repository at this point in the history
Signed-off-by: xdlbdy <[email protected]>
  • Loading branch information
xdlbdy committed Mar 28, 2023
1 parent ba347ae commit 30814f8
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 74 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ require (
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
)

Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,8 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -545,8 +545,8 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk=
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -607,8 +607,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -619,8 +619,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
84 changes: 44 additions & 40 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package trigger

import (
"context"
stdErr "errors"
stderr "errors"
"fmt"
"io"
"os"
"strings"
"sync"
"time"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"

eb "github.com/vanus-labs/vanus/client"
"github.com/vanus-labs/vanus/internal/controller/member"
"github.com/vanus-labs/vanus/internal/controller/trigger/metadata"
Expand All @@ -42,22 +45,21 @@ import (
"github.com/vanus-labs/vanus/pkg/util"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

var _ ctrlpb.TriggerControllerServer = &controller{}

const (
defaultGcSubscriptionInterval = time.Second * 10
waitEventbusReadyTime = time.Minute * 3
waitEventbusCheckPeriod = time.Second * 2
)

func NewController(config Config, mem member.Member) *controller {
ctrl := &controller{
config: config,
member: mem,
needCleanSubscription: map[vanus.ID]string{},
state: primitive.ServerStateCreated,
cl: cluster.NewClusterController(config.ControllerAddr, insecure.NewCredentials()),
ebClient: eb.Connect(config.ControllerAddr),
}
Expand All @@ -79,17 +81,13 @@ type controller struct {
isLeader bool
ctx context.Context
stopFunc context.CancelFunc
state primitive.ServerState
cl cluster.Cluster
ebClient eb.Client
}

func (ctrl *controller) SetDeadLetterEventOffset(
ctx context.Context, request *ctrlpb.SetDeadLetterEventOffsetRequest,
) (*emptypb.Empty, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.SubscriptionId)
err := ctrl.subscriptionManager.SaveDeadLetterOffset(ctx, subID, request.GetOffset())
if err != nil {
Expand All @@ -101,9 +99,6 @@ func (ctrl *controller) SetDeadLetterEventOffset(
func (ctrl *controller) GetDeadLetterEventOffset(
ctx context.Context, request *ctrlpb.GetDeadLetterEventOffsetRequest,
) (*ctrlpb.GetDeadLetterEventOffsetResponse, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.SubscriptionId)
offset, err := ctrl.subscriptionManager.GetDeadLetterOffset(ctx, subID)
if err != nil {
Expand All @@ -115,9 +110,6 @@ func (ctrl *controller) GetDeadLetterEventOffset(
func (ctrl *controller) CommitOffset(
ctx context.Context, request *ctrlpb.CommitOffsetRequest,
) (*ctrlpb.CommitOffsetResponse, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
resp := new(ctrlpb.CommitOffsetResponse)
for _, subInfo := range request.SubscriptionInfo {
if len(subInfo.Offsets) == 0 {
Expand All @@ -139,9 +131,6 @@ func (ctrl *controller) CommitOffset(
func (ctrl *controller) ResetOffsetToTimestamp(
ctx context.Context, request *ctrlpb.ResetOffsetToTimestampRequest,
) (*ctrlpb.ResetOffsetToTimestampResponse, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
if request.Timestamp == 0 {
return nil, errors.ErrInvalidRequest.WithMessage("timestamp is invalid")
}
Expand All @@ -166,15 +155,20 @@ func (ctrl *controller) ResetOffsetToTimestamp(
func (ctrl *controller) CreateSubscription(
ctx context.Context, request *ctrlpb.CreateSubscriptionRequest,
) (*metapb.Subscription, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
err := validation.ValidateSubscriptionRequest(ctx, request.Subscription)
if err != nil {
log.Info(ctx).Err(err).Msg("create subscription validate fail")
return nil, err
}
sub := convert.FromPbSubscriptionRequest(request.Subscription)
_, err = ctrl.cl.NamespaceService().GetNamespace(ctx, sub.NamespaceID.Uint64())
if err != nil {
return nil, err
}
_, err = ctrl.cl.EventbusService().GetEventbus(ctx, sub.EventbusID.Uint64())
if err != nil {
return nil, err
}
sub.ID, err = vanus.NewID()
sub.CreatedAt = time.Now()
sub.UpdatedAt = time.Now()
Expand All @@ -200,9 +194,6 @@ func (ctrl *controller) CreateSubscription(
func (ctrl *controller) UpdateSubscription(
ctx context.Context, request *ctrlpb.UpdateSubscriptionRequest,
) (*metapb.Subscription, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.Id)
sub := ctrl.subscriptionManager.GetSubscription(ctx, subID)
if sub == nil {
Expand Down Expand Up @@ -243,9 +234,6 @@ func (ctrl *controller) UpdateSubscription(
func (ctrl *controller) DeleteSubscription(
ctx context.Context, request *ctrlpb.DeleteSubscriptionRequest,
) (*emptypb.Empty, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.Id)
sub := ctrl.subscriptionManager.GetSubscription(ctx, subID)
if sub != nil {
Expand All @@ -269,9 +257,6 @@ func (ctrl *controller) DeleteSubscription(
func (ctrl *controller) DisableSubscription(
ctx context.Context, request *ctrlpb.DisableSubscriptionRequest,
) (*emptypb.Empty, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.Id)
sub := ctrl.subscriptionManager.GetSubscription(ctx, subID)
if sub == nil {
Expand Down Expand Up @@ -302,9 +287,6 @@ func (ctrl *controller) DisableSubscription(
func (ctrl *controller) ResumeSubscription(
ctx context.Context, request *ctrlpb.ResumeSubscriptionRequest,
) (*emptypb.Empty, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
subID := vanus.ID(request.Id)
sub := ctrl.subscriptionManager.GetSubscription(ctx, subID)
if sub == nil {
Expand All @@ -326,9 +308,6 @@ func (ctrl *controller) ResumeSubscription(
func (ctrl *controller) GetSubscription(
ctx context.Context, request *ctrlpb.GetSubscriptionRequest,
) (*metapb.Subscription, error) {
if ctrl.state != primitive.ServerStateRunning {
return nil, errors.ErrServerNotStart
}
sub := ctrl.subscriptionManager.GetSubscription(ctx, vanus.ID(request.Id))
if sub == nil {
return nil, errors.ErrResourceNotFound.WithMessage("subscription not exist")
Expand All @@ -355,7 +334,7 @@ func (ctrl *controller) TriggerWorkerHeartbeat(
}
req, err := heartbeat.Recv()
if err != nil {
if !stdErr.Is(err, io.EOF) {
if !stderr.Is(err, io.EOF) {
log.Warn(ctx).Err(err).Msg("heartbeat recv error")
}
log.Info(ctx).Msg("heartbeat close")
Expand Down Expand Up @@ -562,7 +541,6 @@ func (ctrl *controller) membershipChangedProcessor(
ctrl.subscriptionManager.Start()
ctrl.scheduler.Run()
go ctrl.gcSubscriptions(ctx)
ctrl.state = primitive.ServerStateRunning
ctrl.isLeader = true
case member.EventBecomeFollower:
if !ctrl.isLeader {
Expand All @@ -579,13 +557,11 @@ func (ctrl *controller) membershipChangedProcessor(

func (ctrl *controller) stop(_ context.Context) error {
ctrl.member.ResignIfLeader()
ctrl.state = primitive.ServerStateStopping
ctrl.stopFunc()
ctrl.scheduler.Stop()
ctrl.workerManager.Stop()
ctrl.subscriptionManager.Stop()
ctrl.storage.Close()
ctrl.state = primitive.ServerStateStopped
return nil
}

Expand Down Expand Up @@ -621,12 +597,40 @@ func (ctrl *controller) initTriggerSystemEventbus() {
go func() {
ctx := context.Background()
log.Info(ctx).Msg("trigger controller is ready to check system eventbus")
if err := ctrl.cl.WaitForControllerReady(false); err != nil {
log.Error().Err(err).Msg("trigger controller check system eventbus, " +
"but Vanus cluster hasn't ready, exit")
os.Exit(-1)
}
ready := util.WaitReady(func() bool {
exist, err := ctrl.cl.EventbusService().IsSystemEventbusExistByName(ctx, primitive.TimerEventbusName)
if err != nil {
log.Error().Err(err).Msg("check TimerEventbus exist has error")
return false
}
return exist
}, waitEventbusReadyTime, waitEventbusCheckPeriod)
if !ready {
log.Error().Msg("check TimerEventbus timeout no exist, will exist")
os.Exit(-1)
}

// wait TimerEventbus
exist, err := ctrl.cl.EventbusService().IsSystemEventbusExistByName(ctx, primitive.RetryEventbusName)
if err != nil {
log.Error().Err(err).Msg("failed to check RetryEventbus exist, exit")
os.Exit(-1)
}
if exist {
log.Info().Msg("trigger controller check RetryEventbus exist")
return
}
log.Info().Msg("trigger controller check RetryEventbus no exist, will create")
if err := ctrl.cl.WaitForControllerReady(true); err != nil {
log.Error(ctx).Err(err).
Msg("trigger controller try to create system eventbus, but Vanus cluster hasn't ready, exit")
os.Exit(-1)
}

if _, err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.RetryEventbusName,
"System Eventbus For Trigger Service"); err != nil {
log.Error(ctx).Err(err).Msg("failed to create RetryEventbus, exit")
Expand Down
16 changes: 9 additions & 7 deletions internal/controller/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"

"github.com/vanus-labs/vanus/pkg/cluster"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"

Expand All @@ -47,7 +48,6 @@ func TestController_CommitOffset(t *testing.T) {
ctrl.subscriptionManager = subManager

subID := vanus.NewTestID()
ctrl.state = primitive.ServerStateRunning
request := &ctrlpb.CommitOffsetRequest{
ForceCommit: true,
SubscriptionInfo: []*metapb.SubscriptionInfo{{
Expand Down Expand Up @@ -87,7 +87,6 @@ func TestController_ResetOffsetToTimestamp(t *testing.T) {
ctrl.subscriptionManager = subManager

subID := vanus.NewTestID()
ctrl.state = primitive.ServerStateRunning
Convey("reset offset subscription not exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).AnyTimes().Return(nil)
_, err := ctrl.ResetOffsetToTimestamp(ctx, &ctrlpb.ResetOffsetToTimestampRequest{
Expand Down Expand Up @@ -123,11 +122,17 @@ func TestController_CreateSubscription(t *testing.T) {
subManager := subscription.NewMockManager(mockCtrl)
ctrl.subscriptionManager = subManager
ctrl.scheduler = worker.NewSubscriptionScheduler(ctrl.workerManager, ctrl.subscriptionManager)

ctrl.state = primitive.ServerStateRunning
mockCluster := cluster.NewMockCluster(mockCtrl)
ctrl.cl = mockCluster
Convey("create subscription", func() {
subManager.EXPECT().GetSubscriptionByName(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
subManager.EXPECT().AddSubscription(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
mockNsSvc := cluster.NewMockNamespaceService(mockCtrl)
mockEbSvc := cluster.NewMockEventbusService(mockCtrl)
mockCluster.EXPECT().NamespaceService().AnyTimes().Return(mockNsSvc)
mockCluster.EXPECT().EventbusService().AnyTimes().Return(mockEbSvc)
mockNsSvc.EXPECT().GetNamespace(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil)
mockEbSvc.EXPECT().GetEventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil)
create := &ctrlpb.CreateSubscriptionRequest{
Subscription: &ctrlpb.SubscriptionRequest{
NamespaceId: vanus.NewTestID().Uint64(),
Expand Down Expand Up @@ -168,7 +173,6 @@ func TestController_UpdateSubscription(t *testing.T) {
subID := vanus.NewTestID()
eventbusID := vanus.NewTestID()
namespaceID := vanus.NewTestID()
ctrl.state = primitive.ServerStateRunning
Convey("update subscription not exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil)
request := &ctrlpb.UpdateSubscriptionRequest{
Expand Down Expand Up @@ -387,7 +391,6 @@ func TestController_DeleteSubscription(t *testing.T) {
request := &ctrlpb.DeleteSubscriptionRequest{
Id: subID.Uint64(),
}
ctrl.state = primitive.ServerStateRunning
Convey("delete subscription no exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil)
_, err := ctrl.DeleteSubscription(ctx, request)
Expand Down Expand Up @@ -447,7 +450,6 @@ func TestController_GetSubscription(t *testing.T) {
request := &ctrlpb.GetSubscriptionRequest{
Id: subID.Uint64(),
}
ctrl.state = primitive.ServerStateRunning
Convey("get subscription no exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil)
_, err := ctrl.GetSubscription(ctx, request)
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type NamespaceService interface {
}

type EventbusService interface {
IsSystemEventbusExistByName(ctx context.Context, name string) (bool, error)
CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) (*metapb.Eventbus, error)
Delete(ctx context.Context, id uint64) error
GetSystemEventbusByName(ctx context.Context, name string) (*metapb.Eventbus, error)
Expand Down
Loading

0 comments on commit 30814f8

Please sign in to comment.