Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add metrics for remote storage backends #2657

Merged
merged 11 commits into from
Sep 25, 2020
18 changes: 9 additions & 9 deletions src/metrics/rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/m3db/m3/src/metrics/filters"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric"
metricID "github.com/m3db/m3/src/metrics/metric/id"
metricid "github.com/m3db/m3/src/metrics/metric/id"
mpipeline "github.com/m3db/m3/src/metrics/pipeline"
"github.com/m3db/m3/src/metrics/pipeline/applied"
xerrors "github.com/m3db/m3/src/x/errors"
Expand Down Expand Up @@ -58,17 +58,17 @@ type activeRuleSet struct {
rollupRules []*rollupRule
cutoverTimesAsc []int64
tagsFilterOpts filters.TagsFilterOptions
newRollupIDFn metricID.NewIDFn
isRollupIDFn metricID.MatchIDFn
newRollupIDFn metricid.NewIDFn
isRollupIDFn metricid.MatchIDFn
}

func newActiveRuleSet(
version int,
mappingRules []*mappingRule,
rollupRules []*rollupRule,
tagsFilterOpts filters.TagsFilterOptions,
newRollupIDFn metricID.NewIDFn,
isRollupIDFn metricID.MatchIDFn,
newRollupIDFn metricid.NewIDFn,
isRollupIDFn metricid.MatchIDFn,
) *activeRuleSet {
uniqueCutoverTimes := make(map[int64]struct{})
for _, mappingRule := range mappingRules {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (as *activeRuleSet) toRollupResults(
multiErr = xerrors.NewMultiError()
pipelines = make([]metadata.PipelineMetadata, 0, len(targets))
newRollupIDResults = make([]idWithMatchResults, 0, len(targets))
tagPairs []metricID.TagPair
tagPairs []metricid.TagPair
)

for _, target := range targets {
Expand Down Expand Up @@ -402,7 +402,7 @@ func (as *activeRuleSet) matchRollupTarget(
sortedTagPairBytes []byte,
newName []byte,
rollupTags [][]byte,
tagPairs []metricID.TagPair, // buffer for reuse to generate rollup ID across calls
tagPairs []metricid.TagPair, // buffer for reuse to generate rollup ID across calls
opts matchRollupTargetOptions,
) ([]byte, bool) {
var (
Expand All @@ -415,7 +415,7 @@ func (as *activeRuleSet) matchRollupTarget(
res := bytes.Compare(tagName, rollupTags[currTagIdx])
if res == 0 {
if opts.generateRollupID {
tagPairs = append(tagPairs, metricID.TagPair{Name: tagName, Value: tagVal})
tagPairs = append(tagPairs, metricid.TagPair{Name: tagName, Value: tagVal})
}
currTagIdx++
hasMoreTags = sortedTagIter.Next()
Expand Down Expand Up @@ -443,7 +443,7 @@ func (as *activeRuleSet) matchRollupTarget(
func (as *activeRuleSet) applyIDToPipeline(
sortedTagPairBytes []byte,
pipeline mpipeline.Pipeline,
tagPairs []metricID.TagPair, // buffer for reuse across calls
tagPairs []metricid.TagPair, // buffer for reuse across calls
) (applied.Pipeline, error) {
operations := make([]applied.OpUnion, 0, pipeline.Len())
for i := 0; i < pipeline.Len(); i++ {
Expand Down
6 changes: 3 additions & 3 deletions src/metrics/rules/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
merrors "github.com/m3db/m3/src/metrics/errors"
"github.com/m3db/m3/src/metrics/filters"
"github.com/m3db/m3/src/metrics/generated/proto/rulepb"
metricID "github.com/m3db/m3/src/metrics/metric/id"
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/rules/view"
"github.com/m3db/m3/src/metrics/rules/view/changes"
xerrors "github.com/m3db/m3/src/x/errors"
Expand Down Expand Up @@ -143,8 +143,8 @@ type ruleSet struct {
mappingRules []*mappingRule
rollupRules []*rollupRule
tagsFilterOpts filters.TagsFilterOptions
newRollupIDFn metricID.NewIDFn
isRollupIDFn metricID.MatchIDFn
newRollupIDFn metricid.NewIDFn
isRollupIDFn metricid.MatchIDFn
}

// NewRuleSetFromProto creates a new RuleSet from a proto object.
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/graphite/find_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
graphiteStorage "github.com/m3db/m3/src/query/graphite/storage"
graphitestorage "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/json"
Expand Down Expand Up @@ -97,7 +97,7 @@ func parseFindParamsToQueries(r *http.Request) (
http.StatusBadRequest)
}

matchers, err := graphiteStorage.TranslateQueryToMatchersWithTerminator(query)
matchers, err := graphitestorage.TranslateQueryToMatchersWithTerminator(query)
if err != nil {
return nil, nil, "",
xhttp.NewParseError(fmt.Errorf("invalid 'query': %s", query),
Expand Down
111 changes: 106 additions & 5 deletions src/query/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ package remote
import (
"context"
"io"
"strings"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/query/block"
Expand All @@ -36,9 +38,44 @@ import (
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/query/ts/m3db"
"github.com/m3db/m3/src/query/util/logging"
xgrpc "github.com/m3db/m3/src/x/grpc"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
initResultSize = 10
healthCheckInterval = 60 * time.Second
healthCheckTimeout = 10 * time.Second
healthCheckMetricName = "health-check"
healthCheckMetricResultTag = "result"
)

var (
// NB(r): These options tries to ensure we don't let connections go stale
// and cause failed RPCs as a result.
defaultDialOptions = []grpc.DialOption{
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
// After a duration of this time if the client doesn't see any activity it
// pings the server to see if the transport is still alive.
// If set below 10s, a minimum value of 10s will be used instead.
Time: 10 * time.Second,
// After having pinged for keepalive check, the client waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
Timeout: 20 * time.Second,
// If true, client sends keepalive pings even with no active RPCs. If false,
// when there are no active RPCs, Time and Timeout will be ignored and no
// keepalive pings will be sent.
PermitWithoutStream: true,
}),
}
)

// Client is the remote GRPC client.
Expand All @@ -55,40 +92,103 @@ type grpcClient struct {
pools encoding.IteratorPools
poolErr error
opts m3db.Options
closeCh chan struct{}
metrics grpcClientMetrics
}

const initResultSize = 10
type grpcClientMetrics struct {
healthCheckSuccess tally.Counter
healthCheckError tally.Counter
}

func newGRPCClientMetrics(s tally.Scope) grpcClientMetrics {
s = s.SubScope("remote-client")
return grpcClientMetrics{
healthCheckSuccess: s.Tagged(map[string]string{
healthCheckMetricResultTag: "success",
}).Counter(healthCheckMetricName),
healthCheckError: s.Tagged(map[string]string{
healthCheckMetricResultTag: "error",
}).Counter(healthCheckMetricName),
}
}

// NewGRPCClient creates a new remote GRPC client.
func NewGRPCClient(
name string,
addresses []string,
poolWrapper *pools.PoolWrapper,
opts m3db.Options,
instrumentOpts instrument.Options,
additionalDialOpts ...grpc.DialOption,
) (Client, error) {
if len(addresses) == 0 {
return nil, errors.ErrNoClientAddresses
}

// Set name if using a named client.
if remote := strings.TrimSpace(name); remote != "" {
instrumentOpts = instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().Tagged(map[string]string{
"remote-name": remote,
}))
}

scope := instrumentOpts.MetricsScope()
interceptorOpts := xgrpc.InterceptorInstrumentOptions{Scope: scope}

resolver := newStaticResolver(addresses)
balancer := grpc.RoundRobin(resolver)
dialOptions := []grpc.DialOption{
dialOptions := append([]grpc.DialOption{
grpc.WithBalancer(balancer),
grpc.WithInsecure(),
}
grpc.WithUnaryInterceptor(xgrpc.UnaryClientInterceptor(interceptorOpts)),
grpc.WithStreamInterceptor(xgrpc.StreamClientInterceptor(interceptorOpts)),
}, defaultDialOptions...)
dialOptions = append(dialOptions, additionalDialOpts...)
cc, err := grpc.Dial("", dialOptions...)
if err != nil {
return nil, err
}

client := rpc.NewQueryClient(cc)
return &grpcClient{
c := &grpcClient{
client: client,
connection: cc,
poolWrapper: poolWrapper,
opts: opts,
}, nil
closeCh: make(chan struct{}),
metrics: newGRPCClientMetrics(scope),
}
go c.healthCheckUntilClosed()
return c, nil
}

func (c *grpcClient) healthCheckUntilClosed() {
ticker := time.NewTicker(healthCheckInterval)
defer ticker.Stop()

req := &rpc.HealthRequest{}
for {
// Perform immediately so first check isn't delayed.
ctx, cancel := context.WithTimeout(context.Background(),
healthCheckTimeout)
_, err := c.client.Health(ctx, req)
cancel()
if err != nil {
c.metrics.healthCheckError.Inc(1)
} else {
c.metrics.healthCheckSuccess.Inc(1)
}

select {
case <-c.closeCh:
return
case <-ticker.C:
// Continue to next check.
continue
}
}
}

func (c *grpcClient) waitForPools() (encoding.IteratorPools, error) {
Expand Down Expand Up @@ -324,5 +424,6 @@ func (c *grpcClient) CompleteTags(
}

func (c *grpcClient) Close() error {
close(c.closeCh)
return c.connection.Close()
}
7 changes: 5 additions & 2 deletions src/query/remote/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
)

var (
testName = "remote_foo"
errRead = errors.New("read error")
poolsWrapper = pools.NewPoolsWrapper(
pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{}))
Expand Down Expand Up @@ -176,7 +177,8 @@ func buildClient(t *testing.T, hosts []string) Client {
SetReadWorkerPool(readWorkerPool).
SetTagOptions(models.NewTagOptions())

client, err := NewGRPCClient(hosts, poolsWrapper, opts, grpc.WithBlock())
client, err := NewGRPCClient(testName, hosts, poolsWrapper, opts,
instrument.NewTestOptions(t), grpc.WithBlock())
require.NoError(t, err)
return client
}
Expand Down Expand Up @@ -298,7 +300,8 @@ func TestMultipleClientRpc(t *testing.T) {
func TestEmptyAddressListErrors(t *testing.T) {
addresses := []string{}
opts := m3db.NewOptions()
client, err := NewGRPCClient(addresses, poolsWrapper, opts, grpc.WithBlock())
client, err := NewGRPCClient(testName, addresses, poolsWrapper, opts,
instrument.NewTestOptions(t), grpc.WithBlock())
assert.Nil(t, client)
assert.Equal(t, m3err.ErrNoClientAddresses, err)
}
Expand Down
16 changes: 7 additions & 9 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/policy/filter"
"github.com/m3db/m3/src/query/pools"
tsdbRemote "github.com/m3db/m3/src/query/remote"
tsdbremote "github.com/m3db/m3/src/query/remote"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/fanout"
"github.com/m3db/m3/src/query/storage/m3"
Expand Down Expand Up @@ -953,18 +953,15 @@ func remoteZoneStorage(
zone config.Remote,
poolWrapper *pools.PoolWrapper,
opts tsdb.Options,
instrumentOpts instrument.Options,
) (storage.Storage, error) {
if len(zone.Addresses) == 0 {
// No addresses; skip.
return nil, nil
}

client, err := tsdbRemote.NewGRPCClient(
zone.Addresses,
poolWrapper,
opts,
)

client, err := tsdbremote.NewGRPCClient(zone.Name, zone.Addresses,
poolWrapper, opts, instrumentOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -994,7 +991,8 @@ func remoteClient(
zap.Strings("addresses", zone.Addresses),
)

remote, err := remoteZoneStorage(zone, poolWrapper, opts)
remote, err := remoteZoneStorage(zone, poolWrapper, opts,
instrumentOpts)
if err != nil {
return nil, false, err
}
Expand All @@ -1015,7 +1013,7 @@ func startGRPCServer(
logger := instrumentOpts.Logger()

logger.Info("creating gRPC server")
server := tsdbRemote.NewGRPCServer(storage,
server := tsdbremote.NewGRPCServer(storage,
queryContextOptions, poolWrapper, instrumentOpts)

if opts.ReflectionEnabled() {
Expand Down
23 changes: 23 additions & 0 deletions src/x/generated/proto/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate sh -c "$GOPATH/src/$PACKAGE/scripts/proto-gen.sh $PACKAGE/src/x/generated/proto"

package proto
Loading