Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Adds cache TTL and routing/caching strategy (per plugin) #639

Merged
merged 1 commit into from
Dec 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

241 changes: 197 additions & 44 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/client"
"github.com/intelsdi-x/snap/control/routing"
"github.com/intelsdi-x/snap/control/strategy"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
)

Expand All @@ -51,12 +53,11 @@ const (
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason the comments on the variables were removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt they added more clutter than value.

@geauxvirtual: do you feel they should be put back?

var (
// ErrPoolNotFound - error message when the plugin pool not found
ErrPoolNotFound = errors.New("plugin pool not found")
// ErrBadKey - error message when a bad key used
ErrBadKey = errors.New("bad key")
// ErrBadType - error message when a bad plugin type used
ErrBadType = errors.New("bad plugin type")
ErrPoolEmpty = errors.New("plugin pool is empty")
ErrBadKey = errors.New("bad key")
ErrBadType = errors.New("bad plugin type")
ErrBadStrategy = errors.New("bad strategy")

// This defines the maximum running instances of a loaded plugin.
// It is initialized at runtime via the cli.
Expand Down Expand Up @@ -307,6 +308,9 @@ type apPool struct {

// The number of subscriptions per running instance
concurrencyCount int

// The routing and caching strategy declared by the plugin.
strategy strategy.RoutingAndCaching
}

func newPool(key string, plugins ...*availablePlugin) (*apPool, error) {
Expand All @@ -327,20 +331,8 @@ func newPool(key string, plugins ...*availablePlugin) (*apPool, error) {

if len(plugins) > 0 {
for _, plg := range plugins {
plg.id = p.generatePID()
p.plugins[plg.id] = plg
}
// Because plugin metadata is a singleton and immutable (in static code)
// it is safe to take the first item. Reloading an identical plugin
// with new metadata is protected by plugin loading.

// Checking if plugin is exclusive
// (only one instance should be running).
if plugins[0].meta.Exclusive {
p.max = 1
p.insert(plg)
}
// set concurrency count
p.concurrencyCount = plugins[0].meta.ConcurrencyCount
}

return p, nil
Expand All @@ -350,17 +342,46 @@ func (p *apPool) insert(ap *availablePlugin) error {
if ap.pluginType != plugin.CollectorPluginType && ap.pluginType != plugin.ProcessorPluginType && ap.pluginType != plugin.PublisherPluginType {
return ErrBadType
}
ap.id = p.generatePID()
p.plugins[ap.id] = ap

// If an empty pool is created, it does not have
// any available plugins from which to retrieve
// concurrency count or exclusivity. We ensure it
// is set correctly on an insert.
if len(p.plugins) == 0 {
if err := p.applyPluginMeta(ap); err != nil {
return err
}
}

ap.id = p.generatePID()
p.plugins[ap.id] = ap

return nil
}

func (p *apPool) applyPluginMeta(ap *availablePlugin) error {
// Checking if plugin is exclusive
// (only one instance should be running).
if ap.meta.Exclusive {
p.max = 1
}

// Set the cache TTL
cacheTTL := strategy.GlobalCacheExpiration
if ap.meta.CacheTTL != 0 {
cacheTTL = ap.meta.CacheTTL
}

// Set the routing and caching strategy
switch ap.meta.RoutingStrategy {
case plugin.DefaultRouting:
p.strategy = strategy.NewLRU(cacheTTL)
default:
return ErrBadStrategy
}

// set concurrency count
p.concurrencyCount = ap.meta.ConcurrencyCount

return nil
}

Expand Down Expand Up @@ -484,17 +505,17 @@ func (p *apPool) subscriptionCount() int {
return len(p.subs)
}

func (p *apPool) selectAP(strat RoutingStrategy) (*availablePlugin, serror.SnapError) {
func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) {
p.RLock()
defer p.RUnlock()

sp := make([]routing.SelectablePlugin, p.count())
sp := make([]strategy.SelectablePlugin, p.count())
i := 0
for _, plg := range p.plugins {
sp[i] = plg
i++
}
sap, err := strat.Select(p, sp)
sap, err := p.strategy.Select(sp)
if err != nil || sap == nil {
return nil, serror.New(err)
}
Expand All @@ -506,10 +527,6 @@ func (p *apPool) generatePID() uint32 {
return p.pidCounter
}

func (p *apPool) release() {
p.RUnlock()
}

func (p *apPool) moveSubscriptions(to *apPool) []subscription {
var subs []subscription

Expand All @@ -532,24 +549,49 @@ type subscription struct {
taskID string
}

func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) {
return p.strategy.CheckCache(mts)
}

func (p *apPool) UpdateCache(mts []core.Metric) {
p.strategy.UpdateCache(mts)
}

func (p *apPool) CacheHits(ns string, ver int) (uint64, error) {
return p.strategy.CacheHits(ns, ver)
}

func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) {
return p.strategy.CacheMisses(ns, ver)
}
func (p *apPool) AllCacheHits() uint64 {
return p.strategy.AllCacheHits()
}

func (p *apPool) AllCacheMisses() uint64 {
return p.strategy.AllCacheMisses()
}

func (p *apPool) CacheTTL() (time.Duration, error) {
if len(p.plugins) == 0 {
return 0, ErrPoolEmpty
}
return p.strategy.CacheTTL(), nil
}

type availablePlugins struct {
// Used to coordinate operations on the table.
*sync.RWMutex

// the strategy used to select a plugin for execution
routingStrategy RoutingStrategy

// table holds all the plugin pools.
// The Pools' primary keys are equal to
// {plugin_type}:{plugin_name}:{plugin_version}
table map[string]*apPool
}

func newAvailablePlugins(routingStrategy RoutingStrategy) *availablePlugins {
func newAvailablePlugins() *availablePlugins {
return &availablePlugins{
RWMutex: &sync.RWMutex{},
table: make(map[string]*apPool),
routingStrategy: routingStrategy,
RWMutex: &sync.RWMutex{},
table: make(map[string]*apPool),
}
}

Expand Down Expand Up @@ -606,16 +648,127 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) {
return pool, nil
}

func (ap *availablePlugins) holdPool(key string) (*apPool, serror.SnapError) {
pool, err := ap.getPool(key)
func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) {
var results []core.Metric
pool, serr := ap.getPool(pluginKey)
if serr != nil {
return nil, serr
}
if pool == nil {
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes)

if len(metricsToCollect) == 0 {
return metricsFromCache, nil
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.selectAP()
if serr != nil {
return nil, serr
}

// cast client to PluginCollectorClient
cli, ok := p.client.(client.PluginCollectorClient)
if !ok {
return nil, serror.New(errors.New("unable to cast client to PluginCollectorClient"))
}

// collect metrics
metrics, err := cli.CollectMetrics(metricsToCollect)
if err != nil {
return nil, err
return nil, serror.New(err)
}

if pool != nil {
pool.RLock()
pool.UpdateCache(metrics)

results = make([]core.Metric, len(metricsFromCache)+len(metrics))
idx := 0
for _, m := range metrics {
results[idx] = m
idx++
}
return pool, nil
for _, m := range metricsFromCache {
results[idx] = m
idx++
}

// update plugin stats
p.hitCount++
p.lastHitTime = time.Now()

return metrics, nil
}

func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error {
var errs []error
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
return errs
}
if pool == nil {
return []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
if err != nil {
errs = append(errs, err)
return errs
}

cli, ok := p.client.(client.PluginPublisherClient)
if !ok {
return []error{errors.New("unable to cast client to PluginPublisherClient")}
}

errp := cli.Publish(contentType, content, config)
if errp != nil {
return []error{errp}
}
p.hitCount++
p.lastHitTime = time.Now()
return nil
}

func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
return "", nil, errs
}
if pool == nil {
return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
if err != nil {
errs = append(errs, err)
return "", nil, errs
}

cli, ok := p.client.(client.PluginProcessorClient)
if !ok {
return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")}
}

ct, c, errp := cli.Process(contentType, content, config)
if errp != nil {
return "", nil, []error{errp}
}
p.hitCount++
p.lastHitTime = time.Now()
return ct, c, nil
}

func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror.SnapError) {
Expand Down Expand Up @@ -664,7 +817,7 @@ func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapE
return nil, err
}

return pool.selectAP(ap.routingStrategy)
return pool.selectAP()
}

func (ap *availablePlugins) pools() map[string]*apPool {
Expand Down
Loading