diff --git a/client/caller/caller.go b/client/caller/caller.go new file mode 100644 index 00000000000..3df2297b0ac --- /dev/null +++ b/client/caller/caller.go @@ -0,0 +1,67 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caller + +import ( + "os" + "path/filepath" + "runtime" + "strings" +) + +type ( + // Caller ID can be understood as a binary file; it is a process. + ID string + // Caller component refers to the components within the process. + Component string +) + +const ( + // TestID is used for test. + TestID ID = "test" + // TestComponent is used for test. + TestComponent Component = "test" +) + +var processName ID + +func init() { + processName = ID(filepath.Base(os.Args[0])) +} + +// GetCallerID returns the name of the currently running process +func GetCallerID() ID { + return processName +} + +// GetComponent returns the package path of the calling function +// The argument upperLayer specifies the number of stack frames to ascend. +// NOTE: This function is time-consuming and please do not use it in high qps scenarios. +func GetComponent(upperLayer int) Component { + // Get the program counter for the calling function + pc, _, _, ok := runtime.Caller(upperLayer + 1) + if !ok { + return "unknown" + } + + // Retrieve the full function name, including the package path + fullFuncName := runtime.FuncForPC(pc).Name() + + // Separates the package and function + lastSlash := strings.LastIndex(fullFuncName, ".") + + // Extract the package name + return Component(fullFuncName[:lastSlash]) +} diff --git a/client/caller/caller_test.go b/client/caller/caller_test.go new file mode 100644 index 00000000000..a6ab51ef700 --- /dev/null +++ b/client/caller/caller_test.go @@ -0,0 +1,36 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package caller + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetComponent(t *testing.T) { + re := require.New(t) + + re.Equal(Component("github.com/tikv/pd/client/caller"), GetComponent(0)) + re.Equal(Component("testing"), GetComponent(1)) + re.Equal(Component("runtime"), GetComponent(2)) + re.Equal(Component("unknown"), GetComponent(3)) +} + +func TestGetCallerID(t *testing.T) { + re := require.New(t) + + re.Equal(ID("caller.test"), GetCallerID()) +} diff --git a/client/client.go b/client/client.go index 8a79e9663c5..a9f5a6fae0f 100644 --- a/client/client.go +++ b/client/client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "crypto/tls" "encoding/hex" "fmt" "net/url" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/clients/metastorage" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/opt" @@ -142,6 +142,16 @@ type RPCClient interface { // SetExternalTimestamp sets external timestamp SetExternalTimestamp(ctx context.Context, timestamp uint64) error + // WithCallerComponent returns a new RPCClient with the specified caller + // component. Caller component refers to the specific part or module within + // the process. You can set the component in two ways: + // * Define it manually, like `caller.Component("DDL")`. + // * Use the provided helper function, `caller.GetComponent(upperLayer)`. + // The upperLayer parameter specifies the depth of the caller stack, + // where 0 means the current function. Adjust the upperLayer value based + // on your needs. + WithCallerComponent(callerComponent caller.Component) RPCClient + // TSOClient is the TSO client. TSOClient metastorage.Client @@ -212,22 +222,10 @@ func (k *serviceModeKeeper) close() { } type client struct { - keyspaceID uint32 - svrUrls []string - pdSvcDiscovery *pdServiceDiscovery - tokenDispatcher *tokenDispatcher - - // For service mode switching. - serviceModeKeeper - - // For internal usage. - updateTokenConnectionCh chan struct{} + // Caller component refers to the components within the process. + callerComponent caller.Component - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - tlsCfg *tls.Config - option *opt.Option + inner *innerClient } // SecurityOption records options about tls @@ -275,35 +273,45 @@ func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) { // NewClient creates a PD client. func NewClient( + callerComponent caller.Component, svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { - return NewClientWithContext(context.Background(), svrAddrs, security, opts...) + return NewClientWithContext(context.Background(), callerComponent, + svrAddrs, security, opts...) } // NewClientWithContext creates a PD client with context. This API uses the default keyspace id 0. func NewClientWithContext( - ctx context.Context, svrAddrs []string, + ctx context.Context, + callerComponent caller.Component, + svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { - return createClientWithKeyspace(ctx, nullKeyspaceID, svrAddrs, security, opts...) + return createClientWithKeyspace(ctx, callerComponent, + nullKeyspaceID, svrAddrs, security, opts...) } // NewClientWithKeyspace creates a client with context and the specified keyspace id. // And now, it's only for test purpose. func NewClientWithKeyspace( - ctx context.Context, keyspaceID uint32, svrAddrs []string, + ctx context.Context, + callerComponent caller.Component, + keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { if keyspaceID < defaultKeyspaceID || keyspaceID > maxKeyspaceID { return nil, errors.Errorf("invalid keyspace id %d. It must be in the range of [%d, %d]", keyspaceID, defaultKeyspaceID, maxKeyspaceID) } - return createClientWithKeyspace(ctx, keyspaceID, svrAddrs, security, opts...) + return createClientWithKeyspace(ctx, callerComponent, keyspaceID, + svrAddrs, security, opts...) } // createClientWithKeyspace creates a client with context and the specified keyspace id. func createClientWithKeyspace( - ctx context.Context, keyspaceID uint32, svrAddrs []string, + ctx context.Context, + callerComponent caller.Component, + keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ @@ -318,34 +326,27 @@ func createClientWithKeyspace( if err != nil { return nil, err } + clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ - updateTokenConnectionCh: make(chan struct{}, 1), - ctx: clientCtx, - cancel: clientCancel, - keyspaceID: keyspaceID, - svrUrls: svrAddrs, - tlsCfg: tlsCfg, - option: opt.NewOption(), + callerComponent: adjustCallerComponent(callerComponent), + inner: &innerClient{ + keyspaceID: keyspaceID, + svrUrls: svrAddrs, + updateTokenConnectionCh: make(chan struct{}, 1), + ctx: clientCtx, + cancel: clientCancel, + tlsCfg: tlsCfg, + option: opt.NewOption(), + }, } // Inject the client options. for _, opt := range opts { - opt(c.option) - } - - c.pdSvcDiscovery = newPDServiceDiscovery( - clientCtx, clientCancel, &c.wg, c.setServiceMode, - nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option) - if err := c.setup(); err != nil { - c.cancel() - if c.pdSvcDiscovery != nil { - c.pdSvcDiscovery.Close() - } - return nil, err + opt(c.inner.option) } - return c, nil + return c, c.inner.init(nil) } // APIVersion is the API version the server and the client is using. @@ -407,15 +408,19 @@ func (apiCtx *apiContextV2) GetKeyspaceName() (keyspaceName string) { // NewClientWithAPIContext creates a client according to the API context. func NewClientWithAPIContext( - ctx context.Context, apiCtx APIContext, svrAddrs []string, + ctx context.Context, apiCtx APIContext, + callerComponent caller.Component, + svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { apiVersion, keyspaceName := apiCtx.GetAPIVersion(), apiCtx.GetKeyspaceName() switch apiVersion { case V1: - return NewClientWithContext(ctx, svrAddrs, security, opts...) + return NewClientWithContext(ctx, callerComponent, svrAddrs, + security, opts...) case V2: - return newClientWithKeyspaceName(ctx, keyspaceName, svrAddrs, security, opts...) + return newClientWithKeyspaceName(ctx, callerComponent, + keyspaceName, svrAddrs, security, opts...) default: return nil, errors.Errorf("[pd] invalid API version %d", apiVersion) } @@ -423,7 +428,9 @@ func NewClientWithAPIContext( // newClientWithKeyspaceName creates a client with context and the specified keyspace name. func newClientWithKeyspaceName( - ctx context.Context, keyspaceName string, svrAddrs []string, + ctx context.Context, + callerComponent caller.Component, + keyspaceName string, svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ @@ -440,18 +447,23 @@ func newClientWithKeyspaceName( } clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ - keyspaceID: nullKeyspaceID, - updateTokenConnectionCh: make(chan struct{}, 1), - ctx: clientCtx, - cancel: clientCancel, - svrUrls: svrAddrs, - tlsCfg: tlsCfg, - option: opt.NewOption(), + callerComponent: adjustCallerComponent(callerComponent), + inner: &innerClient{ + // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, + // finally update the keyspace id to the PD service discovery for the following interactions. + keyspaceID: nullKeyspaceID, + updateTokenConnectionCh: make(chan struct{}, 1), + ctx: clientCtx, + cancel: clientCancel, + svrUrls: svrAddrs, + tlsCfg: tlsCfg, + option: opt.NewOption(), + }, } // Inject the client options. for _, opt := range opts { - opt(c.option) + opt(c.inner.option) } updateKeyspaceIDFunc := func() error { @@ -459,173 +471,47 @@ func newClientWithKeyspaceName( if err != nil { return err } - c.keyspaceID = keyspaceMeta.GetId() + c.inner.keyspaceID = keyspaceMeta.GetId() // c.keyspaceID is the source of truth for keyspace id. - c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) + c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID) return nil } - // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, - // finally update the keyspace id to the PD service discovery for the following interactions. - c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, - c.setServiceMode, updateKeyspaceIDFunc, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) - if err := c.setup(); err != nil { - c.cancel() - if c.pdSvcDiscovery != nil { - c.pdSvcDiscovery.Close() - } + if err := c.inner.init(updateKeyspaceIDFunc); err != nil { return nil, err } log.Info("[pd] create pd client with endpoints and keyspace", zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), - zap.Uint32("keyspace-id", c.keyspaceID)) + zap.Uint32("keyspace-id", c.inner.keyspaceID)) return c, nil } -func (c *client) setup() error { - // Init the metrics. - if c.option.InitMetrics { - initAndRegisterMetrics(c.option.MetricsLabels) - } - - // Init the client base. - if err := c.pdSvcDiscovery.Init(); err != nil { - return err - } - - // Register callbacks - c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) - - // Create dispatchers - c.createTokenDispatcher() - return nil -} - // Close closes the client. func (c *client) Close() { - c.cancel() - c.wg.Wait() - - c.serviceModeKeeper.close() - c.pdSvcDiscovery.Close() - - if c.tokenDispatcher != nil { - tokenErr := errors.WithStack(errClosing) - c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr) - c.tokenDispatcher.dispatcherCancel() - } -} - -func (c *client) setServiceMode(newMode pdpb.ServiceMode) { - c.Lock() - defer c.Unlock() - - if c.option.UseTSOServerProxy { - // If we are using TSO server proxy, we always use PD_SVC_MODE. - newMode = pdpb.ServiceMode_PD_SVC_MODE - } - - if newMode == c.serviceMode { - return - } - log.Info("[pd] changing service mode", - zap.String("old-mode", c.serviceMode.String()), - zap.String("new-mode", newMode.String())) - c.resetTSOClientLocked(newMode) - oldMode := c.serviceMode - c.serviceMode = newMode - log.Info("[pd] service mode changed", - zap.String("old-mode", oldMode.String()), - zap.String("new-mode", newMode.String())) -} - -// Reset a new TSO client. -func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { - // Re-create a new TSO client. - var ( - newTSOCli *tsoClient - newTSOSvcDiscovery ServiceDiscovery - ) - switch mode { - case pdpb.ServiceMode_PD_SVC_MODE: - newTSOCli = newTSOClient(c.ctx, c.option, - c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) - case pdpb.ServiceMode_API_SVC_MODE: - newTSOSvcDiscovery = newTSOServiceDiscovery( - c.ctx, metastorage.Client(c), c.pdSvcDiscovery, - c.keyspaceID, c.tlsCfg, c.option) - // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, - // and will be updated later. - newTSOCli = newTSOClient(c.ctx, c.option, - newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{}) - if err := newTSOSvcDiscovery.Init(); err != nil { - log.Error("[pd] failed to initialize tso service discovery. keep the current service mode", - zap.Strings("svr-urls", c.svrUrls), - zap.String("current-mode", c.serviceMode.String()), - zap.Error(err)) - return - } - case pdpb.ServiceMode_UNKNOWN_SVC_MODE: - log.Warn("[pd] intend to switch to unknown service mode, just return") - return - } - newTSOCli.setup() - // Replace the old TSO client. - oldTSOClient := c.tsoClient - c.tsoClient = newTSOCli - oldTSOClient.close() - // Replace the old TSO service discovery if needed. - oldTSOSvcDiscovery := c.tsoSvcDiscovery - // If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and - // no tso microservice discovery is needed. - c.tsoSvcDiscovery = newTSOSvcDiscovery - // Close the old TSO service discovery safely after both the old client and service discovery are replaced. - if oldTSOSvcDiscovery != nil { - // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. - oldTSOSvcDiscovery.Close() - } -} - -func (c *client) getTSOClient() *tsoClient { - c.RLock() - defer c.RUnlock() - return c.tsoClient + c.inner.close() } // ResetTSOClient resets the TSO client, only for test. func (c *client) ResetTSOClient() { - c.Lock() - defer c.Unlock() - c.resetTSOClientLocked(c.serviceMode) -} - -func (c *client) getServiceMode() pdpb.ServiceMode { - c.RLock() - defer c.RUnlock() - return c.serviceMode -} - -func (c *client) scheduleUpdateTokenConnection() { - select { - case c.updateTokenConnectionCh <- struct{}{}: - default: - } + c.inner.Lock() + defer c.inner.Unlock() + c.inner.resetTSOClientLocked(c.inner.serviceMode) } // GetClusterID returns the ClusterID. func (c *client) GetClusterID(context.Context) uint64 { - return c.pdSvcDiscovery.GetClusterID() + return c.inner.pdSvcDiscovery.GetClusterID() } // GetLeaderURL returns the leader URL. func (c *client) GetLeaderURL() string { - return c.pdSvcDiscovery.GetServingURL() + return c.inner.pdSvcDiscovery.GetServingURL() } // GetServiceDiscovery returns the client-side service discovery object func (c *client) GetServiceDiscovery() ServiceDiscovery { - return c.pdSvcDiscovery + return c.inner.pdSvcDiscovery } // UpdateOption updates the client option. @@ -636,30 +522,30 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error { if !ok { return errors.New("[pd] invalid value type for MaxTSOBatchWaitInterval option, it should be time.Duration") } - if err := c.option.SetMaxTSOBatchWaitInterval(interval); err != nil { + if err := c.inner.option.SetMaxTSOBatchWaitInterval(interval); err != nil { return err } case opt.EnableTSOFollowerProxy: - if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { + if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { return errors.New("[pd] tso follower proxy is only supported in PD service mode") } enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } - c.option.SetEnableTSOFollowerProxy(enable) + c.inner.option.SetEnableTSOFollowerProxy(enable) case opt.EnableFollowerHandle: enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") } - c.option.SetEnableFollowerHandle(enable) + c.inner.option.SetEnableFollowerHandle(enable) case opt.TSOClientRPCConcurrency: value, ok := value.(int) if !ok { return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int") } - c.option.SetTSOClientRPCConcurrency(value) + c.inner.option.SetTSOClientRPCConcurrency(value) default: return errors.New("[pd] unsupported client option") } @@ -671,7 +557,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetMembersRequest{Header: c.requestHeader()} protoClient, ctx := c.getClientAndContext(ctx) @@ -688,30 +574,13 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { // getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns // follower pd client and the context which holds forward information. func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { - serviceClient := c.pdSvcDiscovery.GetServiceClient() + serviceClient := c.inner.pdSvcDiscovery.GetServiceClient() if serviceClient == nil || serviceClient.GetClientConn() == nil { return nil, ctx } return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true) } -// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns -// follower pd client and the context which holds forward information. -func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) { - var serviceClient ServiceClient - if allowFollower { - serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind) - if serviceClient != nil { - return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) - } - } - serviceClient = c.pdSvcDiscovery.GetServiceClient() - if serviceClient == nil || serviceClient.GetClientConn() == nil { - return nil, ctx - } - return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) -} - // GetTSAsync implements the TSOClient interface. func (c *client) GetTSAsync(ctx context.Context) TSFuture { defer trace.StartRegion(ctx, "pdclient.GetTSAsync").End() @@ -719,7 +588,7 @@ func (c *client) GetTSAsync(ctx context.Context) TSFuture { span = span.Tracer().StartSpan("pdclient.GetTSAsync", opentracing.ChildOf(span.Context())) defer span.Finish() } - return c.dispatchTSORequestWithRetry(ctx) + return c.inner.dispatchTSORequestWithRetry(ctx) } // GetLocalTSAsync implements the TSOClient interface. @@ -730,46 +599,6 @@ func (c *client) GetLocalTSAsync(ctx context.Context, _ string) TSFuture { return c.GetTSAsync(ctx) } -const ( - dispatchRetryDelay = 50 * time.Millisecond - dispatchRetryCount = 2 -) - -func (c *client) dispatchTSORequestWithRetry(ctx context.Context) TSFuture { - var ( - retryable bool - err error - req *tsoRequest - ) - for i := range dispatchRetryCount { - // Do not delay for the first time. - if i > 0 { - time.Sleep(dispatchRetryDelay) - } - // Get the tsoClient each time, as it may be initialized or switched during the process. - tsoClient := c.getTSOClient() - if tsoClient == nil { - err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil") - continue - } - // Get a new request from the pool if it's nil or not from the current pool. - if req == nil || req.pool != tsoClient.tsoReqPool { - req = tsoClient.getTSORequest(ctx) - } - retryable, err = tsoClient.dispatchRequest(req) - if !retryable { - break - } - } - if err != nil { - if req == nil { - return newTSORequestFastFail(err) - } - req.tryDone(err) - } - return req -} - // GetTS implements the TSOClient interface. func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) { resp := c.GetTSAsync(ctx) @@ -787,7 +616,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi // GetMinTS implements the TSOClient interface. func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { // Handle compatibility issue in case of PD/API server doesn't support GetMinTS API. - serviceMode := c.getServiceMode() + serviceMode := c.inner.getServiceMode() switch serviceMode { case pdpb.ServiceMode_UNKNOWN_SVC_MODE: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode") @@ -799,7 +628,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e default: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() // Call GetMinTS API to get the minimal TS from the API leader. protoClient, ctx := c.getClientAndContext(ctx) @@ -858,7 +687,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs var resp *pdpb.GetRegionResponse for _, url := range memberURLs { - conn, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(url) + conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url) if err != nil { log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err)) continue @@ -879,7 +708,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs if resp == nil { cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs) return nil, errors.WithStack(errors.New(errorMsg)) } @@ -894,7 +723,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio } start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.GetRegionOp{} @@ -906,7 +735,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio RegionKey: key, NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) + serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -933,7 +763,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR } start := time.Now() defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.GetRegionOp{} @@ -945,7 +775,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR RegionKey: key, NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) + serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -972,7 +803,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt } start := time.Now() defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.GetRegionOp{} @@ -984,7 +815,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt RegionId: regionID, NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) + serviceClient, cctx := c.inner.getRegionAPIClientAndContext(ctx, + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1015,7 +847,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.option.Timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() } options := &opt.GetRegionOp{} @@ -1028,7 +860,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, EndKey: endKey, Limit: int32(limit), } - serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) + serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx, + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1065,7 +898,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.option.Timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() } options := &opt.GetRegionOp{} @@ -1083,7 +916,8 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit Limit: int32(limit), ContainAllKeyRange: options.OutputMustContainAllKeyRange, } - serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) + serviceClient, cctx := c.inner.getRegionAPIClientAndContext(scanCtx, + options.AllowFollowerHandle && c.inner.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1161,7 +995,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e start := time.Now() defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetStoreRequest{ Header: c.requestHeader(), @@ -1205,7 +1039,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ( start := time.Now() defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetAllStoresRequest{ Header: c.requestHeader(), @@ -1232,7 +1066,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 start := time.Now() defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.UpdateGCSafePointRequest{ Header: c.requestHeader(), @@ -1263,7 +1097,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, start := time.Now() defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.UpdateServiceGCSafePointRequest{ Header: c.requestHeader(), @@ -1296,7 +1130,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g start := time.Now() defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), @@ -1334,7 +1168,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, } start := time.Now() defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.RegionsOp{} for _, opt := range opts { @@ -1363,7 +1197,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe start := time.Now() defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.GetOperatorRequest{ Header: c.requestHeader(), @@ -1384,7 +1218,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o } start := time.Now() defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() options := &opt.RegionsOp{} for _, opt := range opts { @@ -1404,7 +1238,9 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o func (c *client) requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ - ClusterId: c.pdSvcDiscovery.GetClusterID(), + ClusterId: c.inner.pdSvcDiscovery.GetClusterID(), + CallerId: string(caller.GetCallerID()), + CallerComponent: string(c.callerComponent), } } @@ -1415,7 +1251,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint for _, opt := range opts { opt(options) } - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), @@ -1453,7 +1289,7 @@ func trimHTTPPrefix(str string) string { // LoadGlobalConfig implements the RPCClient interface. func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1485,7 +1321,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items for i, it := range items { resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad} } - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1503,7 +1339,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // TODO: Add retry mechanism // register watch components there globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1552,7 +1388,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // GetExternalTimestamp implements the RPCClient interface. func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1573,7 +1409,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { // SetExternalTimestamp implements the RPCClient interface. func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1597,10 +1433,35 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return errors.WithStack(err) } return errors.WithStack(errors.New(header.GetError().String())) } return nil } + +// WithCallerComponent implements the RPCClient interface. +func (c *client) WithCallerComponent(callerComponent caller.Component) RPCClient { + newClient := *c + newClient.callerComponent = callerComponent + return &newClient +} + +// adjustCallerComponent returns the caller component if it is empty, it +// is the upper layer of the pd client. +func adjustCallerComponent(callerComponent caller.Component) caller.Component { + callerComponent = caller.Component(strings.TrimSpace(string(callerComponent))) + if len(callerComponent) != 0 { + return callerComponent + } + for i := range 10 { // limit the loop to 10 iterations to avoid infinite loop + callerComponent = caller.GetComponent(i) + if !strings.Contains(string(callerComponent), "pd/client") { + return callerComponent + } + } + log.Warn("Unknown callerComponent", zap.String("callerComponent", string(callerComponent))) + // If the callerComponent is still in pd/client, we set it to empty. + return "" +} diff --git a/client/client_test.go b/client/client_test.go index 61c538bb48e..234bb2da10a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" "github.com/tikv/pd/client/utils/tsoutil" @@ -79,7 +80,8 @@ func TestClientCtx(t *testing.T) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3) defer cancel() - _, err := NewClientWithContext(ctx, []string{testClientURL}, SecurityOption{}) + _, err := NewClientWithContext(ctx, caller.TestComponent, + []string{testClientURL}, SecurityOption{}) re.Error(err) re.Less(time.Since(start), time.Second*5) } @@ -87,7 +89,8 @@ func TestClientCtx(t *testing.T) { func TestClientWithRetry(t *testing.T) { re := require.New(t) start := time.Now() - _, err := NewClientWithContext(context.TODO(), []string{testClientURL}, SecurityOption{}, opt.WithMaxErrorRetry(5)) + _, err := NewClientWithContext(context.TODO(), caller.TestComponent, + []string{testClientURL}, SecurityOption{}, opt.WithMaxErrorRetry(5)) re.Error(err) re.Less(time.Since(start), time.Second*10) } diff --git a/client/gc_client.go b/client/gc_client.go index 538538ec50c..2b64cb91c4a 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -41,7 +41,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf start := time.Now() defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateGCSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -70,7 +70,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 start := time.Now() defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &pdpb.UpdateServiceSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -99,7 +99,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [ Revision: revision, } - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { diff --git a/client/go.mod b/client/go.mod index a8129722dd3..ba7b077f46d 100644 --- a/client/go.mod +++ b/client/go.mod @@ -10,7 +10,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index 34bf8b9bc65..e229c47ee96 100644 --- a/client/go.sum +++ b/client/go.sum @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/inner_client.go b/client/inner_client.go new file mode 100644 index 00000000000..47acda56e42 --- /dev/null +++ b/client/inner_client.go @@ -0,0 +1,244 @@ +package pd + +import ( + "context" + "crypto/tls" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + dispatchRetryDelay = 50 * time.Millisecond + dispatchRetryCount = 2 +) + +type innerClient struct { + keyspaceID uint32 + svrUrls []string + pdSvcDiscovery *pdServiceDiscovery + tokenDispatcher *tokenDispatcher + + // For service mode switching. + serviceModeKeeper + + // For internal usage. + updateTokenConnectionCh chan struct{} + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + tlsCfg *tls.Config + option *opt.Option +} + +func (c *innerClient) init(updateKeyspaceIDCb updateKeyspaceIDFunc) error { + c.pdSvcDiscovery = newPDServiceDiscovery( + c.ctx, c.cancel, &c.wg, c.setServiceMode, + updateKeyspaceIDCb, c.keyspaceID, c.svrUrls, c.tlsCfg, c.option) + if err := c.setup(); err != nil { + c.cancel() + if c.pdSvcDiscovery != nil { + c.pdSvcDiscovery.Close() + } + return err + } + + return nil +} + +func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) { + c.Lock() + defer c.Unlock() + + if c.option.UseTSOServerProxy { + // If we are using TSO server proxy, we always use PD_SVC_MODE. + newMode = pdpb.ServiceMode_PD_SVC_MODE + } + + if newMode == c.serviceMode { + return + } + log.Info("[pd] changing service mode", + zap.String("old-mode", c.serviceMode.String()), + zap.String("new-mode", newMode.String())) + c.resetTSOClientLocked(newMode) + oldMode := c.serviceMode + c.serviceMode = newMode + log.Info("[pd] service mode changed", + zap.String("old-mode", oldMode.String()), + zap.String("new-mode", newMode.String())) +} + +// Reset a new TSO client. +func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { + // Re-create a new TSO client. + var ( + newTSOCli *tsoClient + newTSOSvcDiscovery ServiceDiscovery + ) + switch mode { + case pdpb.ServiceMode_PD_SVC_MODE: + newTSOCli = newTSOClient(c.ctx, c.option, + c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) + case pdpb.ServiceMode_API_SVC_MODE: + newTSOSvcDiscovery = newTSOServiceDiscovery( + c.ctx, c, c.pdSvcDiscovery, + c.keyspaceID, c.tlsCfg, c.option) + // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, + // and will be updated later. + newTSOCli = newTSOClient(c.ctx, c.option, + newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{}) + if err := newTSOSvcDiscovery.Init(); err != nil { + log.Error("[pd] failed to initialize tso service discovery. keep the current service mode", + zap.Strings("svr-urls", c.svrUrls), + zap.String("current-mode", c.serviceMode.String()), + zap.Error(err)) + return + } + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + log.Warn("[pd] intend to switch to unknown service mode, just return") + return + } + newTSOCli.setup() + // Replace the old TSO client. + oldTSOClient := c.tsoClient + c.tsoClient = newTSOCli + oldTSOClient.close() + // Replace the old TSO service discovery if needed. + oldTSOSvcDiscovery := c.tsoSvcDiscovery + // If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and + // no tso microservice discovery is needed. + c.tsoSvcDiscovery = newTSOSvcDiscovery + // Close the old TSO service discovery safely after both the old client and service discovery are replaced. + if oldTSOSvcDiscovery != nil { + // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. + oldTSOSvcDiscovery.Close() + } +} + +func (c *innerClient) scheduleUpdateTokenConnection() { + select { + case c.updateTokenConnectionCh <- struct{}{}: + default: + } +} + +func (c *innerClient) getServiceMode() pdpb.ServiceMode { + c.RLock() + defer c.RUnlock() + return c.serviceMode +} + +func (c *innerClient) getTSOClient() *tsoClient { + c.RLock() + defer c.RUnlock() + return c.tsoClient +} + +func (c *innerClient) close() { + c.cancel() + c.wg.Wait() + + c.serviceModeKeeper.close() + c.pdSvcDiscovery.Close() + + if c.tokenDispatcher != nil { + tokenErr := errors.WithStack(errClosing) + c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr) + c.tokenDispatcher.dispatcherCancel() + } +} + +func (c *innerClient) setup() error { + // Init the metrics. + if c.option.InitMetrics { + initAndRegisterMetrics(c.option.MetricsLabels) + } + + // Init the client base. + if err := c.pdSvcDiscovery.Init(); err != nil { + return err + } + + // Register callbacks + c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) + + // Create dispatchers + c.createTokenDispatcher() + return nil +} + +// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns +// follower pd client and the context which holds forward information. +func (c *innerClient) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) { + var serviceClient ServiceClient + if allowFollower { + serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind) + if serviceClient != nil { + return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) + } + } + serviceClient = c.pdSvcDiscovery.GetServiceClient() + if serviceClient == nil || serviceClient.GetClientConn() == nil { + return nil, ctx + } + return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) +} + +// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. +func (c *innerClient) gRPCErrorHandler(err error) { + if errs.IsLeaderChange(err) { + c.pdSvcDiscovery.ScheduleCheckMemberChanged() + } +} + +func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) { + cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.pdSvcDiscovery.getLeaderURL()) + if err != nil { + return nil, err + } + return cc, err +} + +func (c *innerClient) dispatchTSORequestWithRetry(ctx context.Context) TSFuture { + var ( + retryable bool + err error + req *tsoRequest + ) + for i := range dispatchRetryCount { + // Do not delay for the first time. + if i > 0 { + time.Sleep(dispatchRetryDelay) + } + // Get the tsoClient each time, as it may be initialized or switched during the process. + tsoClient := c.getTSOClient() + if tsoClient == nil { + err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil") + continue + } + // Get a new request from the pool if it's nil or not from the current pool. + if req == nil || req.pool != tsoClient.tsoReqPool { + req = tsoClient.getTSORequest(ctx) + } + retryable, err = tsoClient.dispatchRequest(req) + if !retryable { + break + } + } + if err != nil { + if req == nil { + return newTSORequestFastFail(err) + } + req.tryDone(err) + } + return req +} diff --git a/client/keyspace_client.go b/client/keyspace_client.go index bdef1edec11..3f8cea993c0 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -38,7 +38,7 @@ type KeyspaceClient interface { // keyspaceClient returns the KeyspaceClient from current PD leader. func (c *client) keyspaceClient() keyspacepb.KeyspaceClient { - if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { + if client := c.inner.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { return keyspacepb.NewKeyspaceClient(client) } return nil @@ -52,7 +52,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key } start := time.Now() defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.LoadKeyspaceRequest{ Header: c.requestHeader(), Name: name, @@ -67,7 +67,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key if err != nil { cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } @@ -96,7 +96,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp } start := time.Now() defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.UpdateKeyspaceStateRequest{ Header: c.requestHeader(), Id: id, @@ -112,7 +112,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp if err != nil { cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } @@ -140,7 +140,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint } start := time.Now() defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) req := &keyspacepb.GetAllKeyspacesRequest{ Header: c.requestHeader(), StartId: startID, @@ -156,7 +156,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint if err != nil { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) - c.pdSvcDiscovery.ScheduleCheckMemberChanged() + c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged() return nil, err } diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 3cc24f0bece..6409b6e7a46 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -29,7 +29,7 @@ import ( ) // metaStorageClient gets the meta storage client from current PD leader. -func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient { +func (c *innerClient) metaStorageClient() meta_storagepb.MetaStorageClient { if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { return meta_storagepb.NewMetaStorageClient(client) } @@ -51,7 +51,7 @@ func getPrefix(key []byte) []byte { } // Put implements the MetaStorageClient interface. -func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { +func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { options := &opt.MetaStorageOp{} for _, opt := range opts { opt(options) @@ -71,7 +71,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaSto Lease: options.Lease, PrevKv: options.PrevKv, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) + ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL()) cli := c.metaStorageClient() if cli == nil { cancel() @@ -87,7 +87,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaSto } // Get implements the MetaStorageClient interface. -func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { +func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { options := &opt.MetaStorageOp{} for _, opt := range opts { opt(options) @@ -110,7 +110,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOpt Limit: options.Limit, Revision: options.Revision, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderURL()) + ctx = grpcutil.BuildForwardContext(ctx, c.pdSvcDiscovery.GetServingURL()) cli := c.metaStorageClient() if cli == nil { cancel() @@ -126,7 +126,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOpt } // Watch implements the MetaStorageClient interface. -func (c *client) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { +func (c *innerClient) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { eventCh := make(chan []*meta_storagepb.Event, 100) options := &opt.MetaStorageOp{} for _, opt := range opts { @@ -172,7 +172,7 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageO return eventCh, err } -func (c *client) respForMetaStorageErr(observer prometheus.Observer, start time.Time, err error, header *meta_storagepb.ResponseHeader) error { +func (c *innerClient) respForMetaStorageErr(observer prometheus.Observer, start time.Time, err error, header *meta_storagepb.ResponseHeader) error { if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { @@ -183,3 +183,18 @@ func (c *client) respForMetaStorageErr(observer prometheus.Observer, start time. } return nil } + +// Put implements the MetaStorageClient interface. +func (c *client) Put(ctx context.Context, key, value []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.PutResponse, error) { + return c.inner.Put(ctx, key, value, opts...) +} + +// Get implements the MetaStorageClient interface. +func (c *client) Get(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (*meta_storagepb.GetResponse, error) { + return c.inner.Get(ctx, key, opts...) +} + +// Watch implements the MetaStorageClient interface. +func (c *client) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) { + return c.inner.Watch(ctx, key, opts...) +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index b59bb2a22d3..513f1a1d170 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -69,24 +69,17 @@ func WithRUStats(op *GetResourceGroupOp) { } // resourceManagerClient gets the ResourceManager client of current PD leader. -func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { - cc, err := c.pdSvcDiscovery.GetOrCreateGRPCConn(c.GetLeaderURL()) +func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) { + cc, err := c.getOrCreateGRPCConn() if err != nil { return nil, err } return rmpb.NewResourceManagerClient(cc), nil } -// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. -func (c *client) gRPCErrorHandler(err error) { - if errs.IsLeaderChange(err) { - c.pdSvcDiscovery.ScheduleCheckMemberChanged() - } -} - // ListResourceGroups loads and returns all metadata of resource groups. func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) { - cc, err := c.resourceManagerClient() + cc, err := c.inner.resourceManagerClient() if err != nil { return nil, err } @@ -99,7 +92,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup } resp, err := cc.ListResourceGroups(ctx, req) if err != nil { - c.gRPCErrorHandler(err) + c.inner.gRPCErrorHandler(err) return nil, errs.ErrClientListResourceGroup.FastGenByArgs(err.Error()) } resErr := resp.GetError() @@ -111,7 +104,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup // GetResourceGroup implements the ResourceManagerClient interface. func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, ops ...GetResourceGroupOption) (*rmpb.ResourceGroup, error) { - cc, err := c.resourceManagerClient() + cc, err := c.inner.resourceManagerClient() if err != nil { return nil, err } @@ -125,7 +118,7 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, } resp, err := cc.GetResourceGroup(ctx, req) if err != nil { - c.gRPCErrorHandler(err) + c.inner.gRPCErrorHandler(err) return nil, &errs.ErrClientGetResourceGroup{ResourceGroupName: resourceGroupName, Cause: err.Error()} } resErr := resp.GetError() @@ -146,7 +139,7 @@ func (c *client) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.Resour } func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup, typ actionType) (string, error) { - cc, err := c.resourceManagerClient() + cc, err := c.inner.resourceManagerClient() if err != nil { return "", err } @@ -161,7 +154,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG resp, err = cc.ModifyResourceGroup(ctx, req) } if err != nil { - c.gRPCErrorHandler(err) + c.inner.gRPCErrorHandler(err) return "", err } resErr := resp.GetError() @@ -173,7 +166,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG // DeleteResourceGroup implements the ResourceManagerClient interface. func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { - cc, err := c.resourceManagerClient() + cc, err := c.inner.resourceManagerClient() if err != nil { return "", err } @@ -182,7 +175,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri } resp, err := cc.DeleteResourceGroup(ctx, req) if err != nil { - c.gRPCErrorHandler(err) + c.inner.gRPCErrorHandler(err) return "", err } resErr := resp.GetError() @@ -217,10 +210,10 @@ func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBuc req := &tokenRequest{ done: make(chan error, 1), requestCtx: ctx, - clientCtx: c.ctx, + clientCtx: c.inner.ctx, Request: request, } - c.tokenDispatcher.tokenBatchController.tokenRequestCh <- req + c.inner.tokenDispatcher.tokenBatchController.tokenRequestCh <- req grantedTokens, err := req.wait() if err != nil { return nil, err @@ -280,7 +273,7 @@ func (cc *resourceManagerConnectionContext) reset() { } } -func (c *client) createTokenDispatcher() { +func (c *innerClient) createTokenDispatcher() { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) dispatcher := &tokenDispatcher{ dispatcherCancel: dispatcherCancel, @@ -292,7 +285,7 @@ func (c *client) createTokenDispatcher() { c.tokenDispatcher = dispatcher } -func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) { +func (c *innerClient) handleResourceTokenDispatcher(dispatcherCtx context.Context, tbc *tokenBatchController) { defer func() { log.Info("[resource manager] exit resource token dispatcher") c.wg.Done() @@ -354,7 +347,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb } } -func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error { +func (c *innerClient) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBucketsClient, t *tokenRequest) error { req := t.Request if err := stream.Send(req); err != nil { err = errors.WithStack(err) @@ -376,7 +369,7 @@ func (c *client) processTokenRequests(stream rmpb.ResourceManager_AcquireTokenBu return nil } -func (c *client) tryResourceManagerConnect(ctx context.Context, connection *resourceManagerConnectionContext) error { +func (c *innerClient) tryResourceManagerConnect(ctx context.Context, connection *resourceManagerConnectionContext) error { var ( err error stream rmpb.ResourceManager_AcquireTokenBucketsClient diff --git a/go.mod b/go.mod index 8ccec342a4b..d116d9e3c41 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7 diff --git a/go.sum b/go.sum index 0c69a1cacf8..a81f4099019 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 118512b88b2..3b173751b6d 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" @@ -850,7 +851,8 @@ func runServer(re *require.Assertions, cluster *tests.TestCluster) []string { } func setupCli(ctx context.Context, re *require.Assertions, endpoints []string, opts ...opt.ClientOption) pd.Client { - cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) + cli, err := pd.NewClientWithContext(ctx, caller.TestComponent, + endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } @@ -1288,6 +1290,19 @@ func (suite *clientTestSuite) TestGetRegionByID() { return reflect.DeepEqual(region, r.Meta) && reflect.DeepEqual(peers[0], r.Leader) }) + + // test WithCallerComponent + testutil.Eventually(re, func() bool { + r, err := suite.client. + WithCallerComponent(caller.GetComponent(0)). + GetRegionByID(context.Background(), regionID) + re.NoError(err) + if r == nil { + return false + } + return reflect.DeepEqual(region, r.Meta) && + reflect.DeepEqual(peers[0], r.Leader) + }) } func (suite *clientTestSuite) TestGetStore() { diff --git a/tests/integrations/client/client_tls_test.go b/tests/integrations/client/client_tls_test.go index 92c25afc1cf..321e3df474c 100644 --- a/tests/integrations/client/client_tls_test.go +++ b/tests/integrations/client/client_tls_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/netutil" @@ -162,11 +163,13 @@ func testTLSReload( go func() { for { dctx, dcancel := context.WithTimeout(ctx, time.Second) - cli, err := pd.NewClientWithContext(dctx, endpoints, pd.SecurityOption{ - CAPath: testClientTLSInfo.TrustedCAFile, - CertPath: testClientTLSInfo.CertFile, - KeyPath: testClientTLSInfo.KeyFile, - }, opt.WithGRPCDialOptions(grpc.WithBlock())) + cli, err := pd.NewClientWithContext(dctx, + caller.TestComponent, + endpoints, pd.SecurityOption{ + CAPath: testClientTLSInfo.TrustedCAFile, + CertPath: testClientTLSInfo.CertFile, + KeyPath: testClientTLSInfo.KeyFile, + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { errc <- err dcancel() @@ -193,11 +196,13 @@ func testTLSReload( // 6. new requests should trigger listener to reload valid certs dctx, dcancel := context.WithTimeout(ctx, 5*time.Second) - cli, err := pd.NewClientWithContext(dctx, endpoints, pd.SecurityOption{ - CAPath: testClientTLSInfo.TrustedCAFile, - CertPath: testClientTLSInfo.CertFile, - KeyPath: testClientTLSInfo.KeyFile, - }, opt.WithGRPCDialOptions(grpc.WithBlock())) + cli, err := pd.NewClientWithContext(dctx, + caller.TestComponent, + endpoints, pd.SecurityOption{ + CAPath: testClientTLSInfo.TrustedCAFile, + CertPath: testClientTLSInfo.CertFile, + KeyPath: testClientTLSInfo.KeyFile, + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) dcancel() cli.Close() @@ -206,11 +211,13 @@ func testTLSReload( caData, certData, keyData := loadTLSContent(re, testClientTLSInfo.TrustedCAFile, testClientTLSInfo.CertFile, testClientTLSInfo.KeyFile) ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) - cli, err = pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{ - SSLCABytes: caData, - SSLCertBytes: certData, - SSLKEYBytes: keyData, - }, opt.WithGRPCDialOptions(grpc.WithBlock())) + cli, err = pd.NewClientWithContext(ctx1, + caller.TestComponent, + endpoints, pd.SecurityOption{ + SSLCABytes: caData, + SSLCertBytes: certData, + SSLKEYBytes: keyData, + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) defer cli.Close() cancel1() @@ -318,11 +325,13 @@ func TestMultiCN(t *testing.T) { func testAllowedCN(ctx context.Context, endpoints []string, tls transport.TLSInfo) error { ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second) defer cancel1() - cli, err := pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{ - CAPath: tls.TrustedCAFile, - CertPath: tls.CertFile, - KeyPath: tls.KeyFile, - }, opt.WithGRPCDialOptions(grpc.WithBlock())) + cli, err := pd.NewClientWithContext(ctx1, + caller.TestComponent, + endpoints, pd.SecurityOption{ + CAPath: tls.TrustedCAFile, + CertPath: tls.CertFile, + KeyPath: tls.KeyFile, + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { return err } diff --git a/tests/integrations/client/gc_client_test.go b/tests/integrations/client/gc_client_test.go index 27912a09550..e3eee89d00d 100644 --- a/tests/integrations/client/gc_client_test.go +++ b/tests/integrations/client/gc_client_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" @@ -77,7 +78,10 @@ func (suite *gcClientTestSuite) SetupSuite() { suite.server = &server.GrpcServer{Server: gsi} re.NoError(err) addr := suite.server.GetAddr() - suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.server.Context(), + caller.TestComponent, + []string{addr}, pd.SecurityOption{}, + ) re.NoError(err) rootPath := path.Join("/pd", strconv.FormatUint(keypath.ClusterID(), 10)) suite.gcSafePointV2Prefix = path.Join(rootPath, keypath.GCSafePointV2Prefix()) diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index c73e714cc2a..72672f3ca81 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -22,6 +22,7 @@ import ( "time" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -79,7 +80,10 @@ func (suite *globalConfigTestSuite) SetupSuite() { suite.server = &server.GrpcServer{Server: gsi} re.NoError(err) addr := suite.server.GetAddr() - suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.server.Context(), + caller.TestComponent, + []string{addr}, pd.SecurityOption{}, + ) re.NoError(err) } diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 4a90239e86d..2407ecee23e 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.0 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 1903a580315..e66074ea779 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -383,8 +383,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 6a8584c4d68..5688ea8a8ac 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/resource_group/controller" "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/utils/testutil" @@ -77,7 +78,9 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { err = suite.cluster.RunInitialServers() re.NoError(err) - suite.client, err = pd.NewClientWithContext(suite.ctx, suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.ctx, + caller.TestComponent, + suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) re.NoError(err) leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) re.NotNil(leader) @@ -1076,7 +1079,9 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { re.NoError(tests.RunServers(serverList)) re.NotEmpty(suite.cluster.WaitLeader()) // re-connect client as well - suite.client, err = pd.NewClientWithContext(suite.ctx, suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.ctx, + caller.TestComponent, + suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) re.NoError(err) cli = suite.client var newGroups []*rmpb.ResourceGroup @@ -1150,7 +1155,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupRUConsumption() { suite.cluster.WaitLeader() // re-connect client as cli.Close() - suite.client, err = pd.NewClientWithContext(suite.ctx, suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.ctx, + caller.TestComponent, + suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) re.NoError(err) cli = suite.client // check ru stats not loss after restart diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index eb1e002f656..f130c782363 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -29,7 +30,9 @@ import ( func SetupClientWithAPIContext( ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...opt.ClientOption, ) pd.Client { - cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, endpoints, pd.SecurityOption{}, opts...) + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, + caller.TestComponent, + endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } @@ -39,7 +42,9 @@ func SetupClientWithKeyspaceID( ctx context.Context, re *require.Assertions, keyspaceID uint32, endpoints []string, opts ...opt.ClientOption, ) pd.Client { - cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...) + cli, err := pd.NewClientWithKeyspace(ctx, + caller.TestComponent, + keyspaceID, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index f266a76cb16..008ee8f978d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" @@ -449,7 +450,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( re.NoError(err) re.NotNil(member) // Prepare the client for keyspace. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, keyspaceID, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, + caller.TestComponent, + keyspaceID, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) re.NoError(err) re.NotNil(tsoClient) var ( @@ -779,7 +782,9 @@ func TestGetTSOImmediately(t *testing.T) { }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2. - cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, + caller.TestComponent, + []string{pdAddr}, pd.SecurityOption{}) re.NoError(err) _, _, err = cli.GetTS(ctx) re.NoError(err) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 77ec760d1c8..8624454aec3 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/discovery" @@ -246,6 +247,7 @@ func NewAPIServerForward(re *require.Assertions) APIServerForward { re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) suite.pdClient, err = pd.NewClientWithContext(context.Background(), + caller.TestComponent, []string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) return suite @@ -610,7 +612,8 @@ func TestTSOServiceSwitch(t *testing.T) { pdLeader := tc.GetServer(leaderName) backendEndpoints := pdLeader.GetAddr() re.NoError(pdLeader.BootstrapCluster()) - pdClient, err := pd.NewClientWithContext(ctx, []string{backendEndpoints}, pd.SecurityOption{}) + pdClient, err := pd.NewClientWithContext(ctx, caller.TestComponent, + []string{backendEndpoints}, pd.SecurityOption{}) re.NoError(err) re.NotNil(pdClient) defer pdClient.Close() diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index 59a56896693..518991fef97 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" ) @@ -48,7 +49,7 @@ func (s *clusterIDSuite) TestClientClusterID() { pdEndpoints := getPDEndpoints(s.T()) // Try to create a client with the mixed endpoints. _, err := pd.NewClientWithContext( - ctx, pdEndpoints, + ctx, caller.TestComponent, pdEndpoints, pd.SecurityOption{}, opt.WithMaxErrorRetry(1), ) re.Error(err) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 19f8f4cd549..121a61b1986 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" bs "github.com/tikv/pd/pkg/basicserver" @@ -149,7 +150,9 @@ func (suite *tsoClientTestSuite) SetupSuite() { func (suite *tsoClientTestSuite) SetupTest() { re := suite.Require() if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, opt.WithForwardingOption(true)) + client, err := pd.NewClientWithContext(suite.ctx, + caller.TestComponent, + suite.getBackendEndpoints(), pd.SecurityOption{}, opt.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -549,6 +552,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { // Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster. re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) pdClient, err := pd.NewClientWithContext(context.Background(), + caller.TestComponent, []string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) defer pdClient.Close() diff --git a/tools/go.mod b/tools/go.mod index 96dc60ffe94..61205f85bd4 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -22,7 +22,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 + github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/common v0.51.1 diff --git a/tools/go.sum b/tools/go.sum index 469f94d7cf3..8dec408a1c8 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -384,8 +384,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU= -github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= +github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index fbe52aa470b..fa8a9164891 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/client_golang/prometheus" flag "github.com/spf13/pflag" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" @@ -376,11 +377,12 @@ func newEtcdClient(cfg *config.Config) *clientv3.Client { // newPDClient returns a pd client. func newPDClient(ctx context.Context, cfg *config.Config) pd.Client { addrs := []string{cfg.PDAddr} - pdCli, err := pd.NewClientWithContext(ctx, addrs, pd.SecurityOption{ - CAPath: cfg.CaPath, - CertPath: cfg.CertPath, - KeyPath: cfg.KeyPath, - }, + pdCli, err := pd.NewClientWithContext(ctx, caller.TestComponent, + addrs, pd.SecurityOption{ + CAPath: cfg.CaPath, + CertPath: cfg.CertPath, + KeyPath: cfg.KeyPath, + }, opt.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index 393ebb97026..944fdcd91dc 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus/promhttp" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/caller" "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" @@ -444,17 +445,22 @@ func createPDClient(ctx context.Context) (pd.Client, error) { if len(*keyspaceName) > 0 { apiCtx := pd.NewAPIContextV2(*keyspaceName) - pdCli, err = pd.NewClientWithAPIContext(ctx, apiCtx, []string{*pdAddrs}, pd.SecurityOption{ - CAPath: *caPath, - CertPath: *certPath, - KeyPath: *keyPath, - }, opts...) + pdCli, err = pd.NewClientWithAPIContext(ctx, apiCtx, + caller.TestComponent, []string{*pdAddrs}, + pd.SecurityOption{ + CAPath: *caPath, + CertPath: *certPath, + KeyPath: *keyPath, + }, opts...) } else { - pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspaceID), []string{*pdAddrs}, pd.SecurityOption{ - CAPath: *caPath, - CertPath: *certPath, - KeyPath: *keyPath, - }, opts...) + pdCli, err = pd.NewClientWithKeyspace(ctx, + caller.TestComponent, + uint32(*keyspaceID), []string{*pdAddrs}, + pd.SecurityOption{ + CAPath: *caPath, + CertPath: *certPath, + KeyPath: *keyPath, + }, opts...) } if err != nil { return nil, err