Skip to content

Commit

Permalink
Add storage based ingester to m3coordinator (#1038)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored Oct 9, 2018
1 parent 440b416 commit 2ffc763
Show file tree
Hide file tree
Showing 16 changed files with 629 additions and 38 deletions.
55 changes: 31 additions & 24 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import:
- services
- integration/etcd

- package: github.com/m3db/m3msg
version: 4680d9b45286826f87b134a4559b11d795786eaf

- package: github.com/m3db/m3metrics
version: 5c6aed344a0c209277d36334f0a824d9f3bb835b

- package: github.com/m3db/m3ctl
version: acc762bfdd42ecb192d34e48fa7ca1fd7ee088ac

- package: github.com/m3db/m3msg
version: 4851e2719e06b15f1fc247e1d00339192963990e

- package: github.com/m3db/bitset
version: 07973db6b78acb62ac207d0538055e874b49d90d

Expand Down
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ services:
expose:
- "7201"
- "7203"
- "7507"
ports:
- "0.0.0.0:7201:7201"
- "0.0.0.0:7203:7203"
- "0.0.0.0:7507:7507"
networks:
- backend
build:
Expand Down
15 changes: 15 additions & 0 deletions scripts/development/m3_stack/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,18 @@ clusters:
jitter: true
backgroundHealthCheckFailLimit: 4
backgroundHealthCheckFailThrottleFactor: 0.5

ingest:
ingester:
workerPoolSize: 100
opPool:
size: 100
retry:
maxRetries: 3
jitter: true
m3msg:
server:
listenAddress: 0.0.0.0:7507
retry:
maxBackoff: 10s
jitter: true
3 changes: 2 additions & 1 deletion src/cmd/services/m3coordinator/downsample/flush_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/convert"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3metrics/metric/aggregated"
"github.com/m3db/m3x/instrument"
Expand Down Expand Up @@ -145,7 +146,7 @@ func (w *downsamplerFlushHandlerWriter) Write(
Timestamp: time.Unix(0, mp.TimeNanos),
Value: mp.Value,
}},
Unit: mp.StoragePolicy.Resolution().Precision,
Unit: convert.UnitForM3DB(mp.StoragePolicy.Resolution().Precision),
Attributes: storage.Attributes{
MetricsType: storage.AggregatedMetricsType,
Retention: mp.StoragePolicy.Retention().Duration(),
Expand Down
84 changes: 84 additions & 0 deletions src/cmd/services/m3coordinator/ingest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingest

import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/pool"
"github.com/m3db/m3x/retry"
xsync "github.com/m3db/m3x/sync"
)

// Configuration configs the ingester.
type Configuration struct {
WorkerPoolSize int `yaml:"workerPoolSize"`
OpPool pool.ObjectPoolConfiguration `yaml:"opPool"`
Retry retry.Configuration `yaml:"retry"`
}

// NewIngester creates an ingester with an appender.
func (cfg Configuration) NewIngester(
appender storage.Appender,
instrumentOptions instrument.Options,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, instrumentOptions)
if err != nil {
return nil, err
}
return NewIngester(opts), nil
}

func (cfg Configuration) newOptions(
appender storage.Appender,
instrumentOptions instrument.Options,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
map[string]string{"component": "ingester"},
)
workers, err := xsync.NewPooledWorkerPool(
cfg.WorkerPoolSize,
xsync.NewPooledWorkerPoolOptions().
SetInstrumentOptions(instrumentOptions),
)
if err != nil {
return Options{}, err
}

workers.Init()
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(),
pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOptions.
SetMetricsScope(instrumentOptions.MetricsScope().
SubScope("tag-decoder-pool"))),
)
tagDecoderPool.Init()
return Options{
Appender: appender,
Workers: workers,
PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions),
TagDecoderPool: tagDecoderPool,
RetryOptions: cfg.Retry.NewOptions(scope),
InstrumentOptions: instrumentOptions,
}, nil
}
Loading

0 comments on commit 2ffc763

Please sign in to comment.