Skip to content

Commit

Permalink
Call GetMetricData api per region instead of per instance (#11882)
Browse files Browse the repository at this point in the history
* Call GetMetricData api per region instead of per instance

* Update changelog

* Don't report event when metricset field is empty

* Make createCloudWatchEvents info more readable

* Make createCloudWatchEvents a method of ec2 metricset

* Replace hardcoded index to variables
  • Loading branch information
kaiyan-sheng authored Apr 25, 2019
1 parent 62dc9bf commit e81f325
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add _bucket to histogram metrics in Prometheus Collector {pull}11578[11578]
- Prevent the docker/memory metricset from processing invalid events before container start {pull}11676[11676]
- Change `add_cloud_metadata` processor to not overwrite `cloud` field when it already exist in the event. {pull}11612[11612] {issue}11305[11305]
- Call GetMetricData api per region instead of per instance. {issue}11820[11820] {pull}11882[11882]

*Packetbeat*

Expand Down
16 changes: 16 additions & 0 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package aws
import (
"strconv"

"github.com/elastic/beats/libbeat/common"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -149,3 +151,17 @@ func StringInSlice(str string, list []string) bool {
}
return false
}

// InitEvent initialize mb.Event with basic information like service.name, cloud.provider
func InitEvent(metricsetName string, regionName string) mb.Event {
event := mb.Event{}
event.Service = metricsetName
event.RootFields = common.MapStr{}
event.MetricSetFields = common.MapStr{}
event.RootFields.Put("service.name", metricsetName)
event.RootFields.Put("cloud.provider", "aws")
if regionName != "" {
event.RootFields.Put("cloud.region", regionName)
}
return event
}
12 changes: 6 additions & 6 deletions x-pack/metricbeat/module/aws/ec2/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
"ec2": {
"cpu": {
"credit_balance": 144,
"credit_usage": 0.008328,
"credit_usage": 0.005318,
"surplus_credit_balance": 0,
"surplus_credits_charged": 0,
"total": {
"pct": 0.1000000000000606
"pct": 0.133333333333212
}
},
"diskio": {
Expand Down Expand Up @@ -47,12 +47,12 @@
},
"network": {
"in": {
"bytes": 615.2,
"packets": 6.2
"bytes": 583,
"packets": 4.8
},
"out": {
"bytes": 841.4,
"packets": 6.8
"bytes": 746.4,
"packets": 5
}
},
"status": {
Expand Down
166 changes: 86 additions & 80 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/metricbeat/module/aws"
)

var metricsetName = "ec2"
var (
metricsetName = "ec2"
instanceIDIdx = 0
metricNameIdx = 1
)

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
Expand Down Expand Up @@ -96,37 +99,38 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
continue
}

var metricDataQueriesTotal []cloudwatch.MetricDataQuery
for _, instanceID := range instanceIDs {
metricDataQueries := constructMetricQueries(listMetricsOutput, instanceID, m.PeriodInSec)
metricDataQueriesTotal = append(metricDataQueriesTotal, constructMetricQueries(listMetricsOutput, instanceID, m.PeriodInSec)...)
}

// If metricDataQueries, still needs to createCloudWatchEvents.
metricDataOutput := []cloudwatch.MetricDataResult{}
if len(metricDataQueries) != 0 {
// Use metricDataQueries to make GetMetricData API calls
metricDataOutput, err = aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName+" for instance "+instanceID)
m.Logger().Error(err.Error())
report.Error(err)
continue
}
var metricDataOutput []cloudwatch.MetricDataResult
if len(metricDataQueriesTotal) != 0 {
// Use metricDataQueries to make GetMetricData API calls
metricDataOutput, err = aws.GetMetricDataResults(metricDataQueriesTotal, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName)
m.Logger().Error(err.Error())
report.Error(err)
continue
}

// Create Cloudwatch Events for EC2
event, info, err := createCloudWatchEvents(metricDataOutput, instanceID, instancesOutputs[instanceID], regionName)
if info != "" {
m.Logger().Info(info)
}
events, err := m.createCloudWatchEvents(metricDataOutput, instancesOutputs, regionName)

if err != nil {
m.Logger().Error(err.Error())
report.Error(err)
continue
}

if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
for _, event := range events {
if len(event.MetricSetFields) != 0 {
if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
}
}
}
}
}
Expand All @@ -135,7 +139,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
}

func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID string, periodInSec int) []cloudwatch.MetricDataQuery {
metricDataQueries := []cloudwatch.MetricDataQuery{}
var metricDataQueries []cloudwatch.MetricDataQuery
metricDataQueryEmpty := cloudwatch.MetricDataQuery{}
for i, listMetric := range listMetricsOutput {
metricDataQuery := createMetricDataQuery(listMetric, instanceID, i, periodInSec)
Expand All @@ -147,26 +151,15 @@ func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID st
return metricDataQueries
}

func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceID string, instanceOutput ec2.Instance, regionName string) (event mb.Event, info string, err error) {
event.Service = metricsetName
event.RootFields = common.MapStr{}
// Cloud fields in ECS
machineType, err := instanceOutput.InstanceType.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.InstanceType.MarshalValue failed")
return
func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceOutput map[string]ec2.Instance, regionName string) (map[string]mb.Event, error) {
// Initialize events and metricSetFieldResults per instanceID
events := map[string]mb.Event{}
metricSetFieldResults := map[string]map[string]interface{}{}
for instanceID := range instanceOutput {
events[instanceID] = aws.InitEvent(metricsetName, regionName)
metricSetFieldResults[instanceID] = map[string]interface{}{}
}

event.RootFields.Put("service.name", metricsetName)
event.RootFields.Put("cloud.provider", "aws")
event.RootFields.Put("cloud.availability_zone", *instanceOutput.Placement.AvailabilityZone)
event.RootFields.Put("cloud.region", regionName)
event.RootFields.Put("cloud.instance.id", instanceID)
event.RootFields.Put("cloud.machine.type", machineType)

// AWS EC2 Metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
Expand All @@ -177,56 +170,69 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
instanceID := labels[instanceIDIdx]
machineType, err := instanceOutput[instanceID].InstanceType.MarshalValue()
if err != nil {
return events, errors.Wrap(err, "instance.InstanceType.MarshalValue failed")
}
events[instanceID].RootFields.Put("cloud.instance.id", instanceID)
events[instanceID].RootFields.Put("cloud.machine.type", machineType)
events[instanceID].RootFields.Put("cloud.availability_zone", *instanceOutput[instanceID].Placement.AvailabilityZone)

if len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
metricSetFieldResults[instanceID][labels[metricNameIdx]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields)
if err != nil {
err = errors.Wrap(err, "Error trying to apply schema schemaMetricSetFields in AWS EC2 metricbeat module.")
return
}
instanceStateName, err := instanceOutput[instanceID].State.Name.MarshalValue()
if err != nil {
return events, errors.Wrap(err, "instance.State.Name.MarshalValue failed")
}

if len(mapOfMetricSetFieldResults) <= 11 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}
monitoringState, err := instanceOutput[instanceID].Monitoring.State.MarshalValue()
if err != nil {
return events, errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed")
}

instanceStateName, err := instanceOutput.State.Name.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.State.Name.MarshalValue failed")
return
}
events[instanceID].MetricSetFields.Put("instance.image.id", *instanceOutput[instanceID].ImageId)
events[instanceID].MetricSetFields.Put("instance.state.name", instanceStateName)
events[instanceID].MetricSetFields.Put("instance.state.code", *instanceOutput[instanceID].State.Code)
events[instanceID].MetricSetFields.Put("instance.monitoring.state", monitoringState)
events[instanceID].MetricSetFields.Put("instance.core.count", *instanceOutput[instanceID].CpuOptions.CoreCount)
events[instanceID].MetricSetFields.Put("instance.threads_per_core", *instanceOutput[instanceID].CpuOptions.ThreadsPerCore)
publicIP := instanceOutput[instanceID].PublicIpAddress
if publicIP != nil {
events[instanceID].MetricSetFields.Put("instance.public.ip", *publicIP)
}

monitoringState, err := instanceOutput.Monitoring.State.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed")
return
}
events[instanceID].MetricSetFields.Put("instance.public.dns_name", *instanceOutput[instanceID].PublicDnsName)
events[instanceID].MetricSetFields.Put("instance.private.dns_name", *instanceOutput[instanceID].PrivateDnsName)
privateIP := instanceOutput[instanceID].PrivateIpAddress
if privateIP != nil {
events[instanceID].MetricSetFields.Put("instance.private.ip", *privateIP)
}

resultMetricSetFields.Put("instance.image.id", *instanceOutput.ImageId)
resultMetricSetFields.Put("instance.state.name", instanceStateName)
resultMetricSetFields.Put("instance.state.code", *instanceOutput.State.Code)
resultMetricSetFields.Put("instance.monitoring.state", monitoringState)
resultMetricSetFields.Put("instance.core.count", *instanceOutput.CpuOptions.CoreCount)
resultMetricSetFields.Put("instance.threads_per_core", *instanceOutput.CpuOptions.ThreadsPerCore)
publicIP := instanceOutput.PublicIpAddress
if publicIP != nil {
resultMetricSetFields.Put("instance.public.ip", *publicIP)
}

}
}
resultMetricSetFields.Put("instance.public.dns_name", *instanceOutput.PublicDnsName)
resultMetricSetFields.Put("instance.private.dns_name", *instanceOutput.PrivateDnsName)
privateIP := instanceOutput.PrivateIpAddress
if privateIP != nil {
resultMetricSetFields.Put("instance.private.ip", *privateIP)

for instanceID, metricSetFieldsPerInstance := range metricSetFieldResults {
if len(metricSetFieldsPerInstance) != 0 {
resultMetricsetFields, err := aws.EventMapping(metricSetFieldsPerInstance, schemaMetricSetFields)
if err != nil {
return events, errors.Wrap(err, "EventMapping failed")
}

events[instanceID].MetricSetFields.Update(resultMetricsetFields)
if len(events[instanceID].MetricSetFields) < 5 {
m.Logger().Info("Missing Cloudwatch data, this is expected for non-running instances" +
" or a new instance during the first data collection. If this shows up multiple times," +
" please recheck the period setting in config. Instance ID: " + instanceID)
}
}
}

event.MetricSetFields = resultMetricSetFields
return
return events, nil
}

func getInstancesPerRegion(svc ec2iface.EC2API) (instanceIDs []string, instancesOutputs map[string]ec2.Instance, err error) {
Expand Down
13 changes: 7 additions & 6 deletions x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/metricbeat/module/aws"

"github.com/elastic/beats/libbeat/common"
)

// MockEC2Client struct is used for unit tests.
Expand Down Expand Up @@ -195,11 +194,13 @@ func TestCreateCloudWatchEvents(t *testing.T) {
},
}

event, _, err := createCloudWatchEvents(getMetricDataOutput, instanceID, instancesOutputs[instanceID], mockModuleConfig.DefaultRegion)
metricSet := MetricSet{}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, mockModuleConfig.DefaultRegion)
assert.NoError(t, err)
assert.Equal(t, expectedEvent.RootFields, event.RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], event.MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], event.MetricSetFields["instance"])
assert.Equal(t, 1, len(events))
assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"])
}

func TestConstructMetricQueries(t *testing.T) {
Expand Down

0 comments on commit e81f325

Please sign in to comment.