Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call GetMetricData api per region instead of per instance #11882

Merged
merged 7 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add _bucket to histogram metrics in Prometheus Collector {pull}11578[11578]
- Prevent the docker/memory metricset from processing invalid events before container start {pull}11676[11676]
- Change `add_cloud_metadata` processor to not overwrite `cloud` field when it already exist in the event. {pull}11612[11612] {issue}11305[11305]
- Call GetMetricData api per region instead of per instance. {issue}11820[11820] {pull}11882[11882]

*Packetbeat*

Expand Down
16 changes: 16 additions & 0 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package aws
import (
"strconv"

"github.com/elastic/beats/libbeat/common"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -149,3 +151,17 @@ func StringInSlice(str string, list []string) bool {
}
return false
}

// InitEvent initialize mb.Event with basic information like service.name, cloud.provider
func InitEvent(metricsetName string, regionName string) mb.Event {
event := mb.Event{}
event.Service = metricsetName
event.RootFields = common.MapStr{}
event.MetricSetFields = common.MapStr{}
event.RootFields.Put("service.name", metricsetName)
event.RootFields.Put("cloud.provider", "aws")
if regionName != "" {
event.RootFields.Put("cloud.region", regionName)
}
return event
}
12 changes: 6 additions & 6 deletions x-pack/metricbeat/module/aws/ec2/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
"ec2": {
"cpu": {
"credit_balance": 144,
"credit_usage": 0.008328,
"credit_usage": 0.005318,
"surplus_credit_balance": 0,
"surplus_credits_charged": 0,
"total": {
"pct": 0.1000000000000606
"pct": 0.133333333333212
}
},
"diskio": {
Expand Down Expand Up @@ -47,12 +47,12 @@
},
"network": {
"in": {
"bytes": 615.2,
"packets": 6.2
"bytes": 583,
"packets": 4.8
},
"out": {
"bytes": 841.4,
"packets": 6.8
"bytes": 746.4,
"packets": 5
}
},
"status": {
Expand Down
159 changes: 83 additions & 76 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/metricbeat/module/aws"
)
Expand Down Expand Up @@ -96,24 +95,25 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
continue
}

var metricDataQueriesTotal []cloudwatch.MetricDataQuery
for _, instanceID := range instanceIDs {
metricDataQueries := constructMetricQueries(listMetricsOutput, instanceID, m.PeriodInSec)
metricDataQueriesTotal = append(metricDataQueriesTotal, constructMetricQueries(listMetricsOutput, instanceID, m.PeriodInSec)...)
}

// If metricDataQueries, still needs to createCloudWatchEvents.
metricDataOutput := []cloudwatch.MetricDataResult{}
if len(metricDataQueries) != 0 {
// Use metricDataQueries to make GetMetricData API calls
metricDataOutput, err = aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName+" for instance "+instanceID)
m.Logger().Error(err.Error())
report.Error(err)
continue
}
// If metricDataQueries, still needs to createCloudWatchEvents.
var metricDataOutput []cloudwatch.MetricDataResult
if len(metricDataQueriesTotal) != 0 {
// Use metricDataQueries to make GetMetricData API calls
metricDataOutput, err = aws.GetMetricDataResults(metricDataQueriesTotal, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName)
m.Logger().Error(err.Error())
report.Error(err)
continue
}

// Create Cloudwatch Events for EC2
event, info, err := createCloudWatchEvents(metricDataOutput, instanceID, instancesOutputs[instanceID], regionName)
events, info, err := createCloudWatchEvents(metricDataOutput, instancesOutputs, regionName)
if info != "" {
m.Logger().Info(info)
}
Expand All @@ -124,9 +124,13 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
continue
}

if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
for _, event := range events {
if len(event.MetricSetFields) != 0 {
if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
}
}
}
}
}
Expand All @@ -135,7 +139,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
}

func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID string, periodInSec int) []cloudwatch.MetricDataQuery {
metricDataQueries := []cloudwatch.MetricDataQuery{}
var metricDataQueries []cloudwatch.MetricDataQuery
metricDataQueryEmpty := cloudwatch.MetricDataQuery{}
for i, listMetric := range listMetricsOutput {
metricDataQuery := createMetricDataQuery(listMetric, instanceID, i, periodInSec)
Expand All @@ -147,26 +151,16 @@ func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID st
return metricDataQueries
}

func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceID string, instanceOutput ec2.Instance, regionName string) (event mb.Event, info string, err error) {
event.Service = metricsetName
event.RootFields = common.MapStr{}
// Cloud fields in ECS
machineType, err := instanceOutput.InstanceType.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.InstanceType.MarshalValue failed")
return
func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceOutput map[string]ec2.Instance, regionName string) (map[string]mb.Event, string, error) {
// Initialize events and metricSetFieldResults per instanceID
events := map[string]mb.Event{}
metricSetFieldResults := map[string]map[string]interface{}{}
info := ""
for instanceID := range instanceOutput {
events[instanceID] = aws.InitEvent(metricsetName, regionName)
metricSetFieldResults[instanceID] = map[string]interface{}{}
}

event.RootFields.Put("service.name", metricsetName)
event.RootFields.Put("cloud.provider", "aws")
event.RootFields.Put("cloud.availability_zone", *instanceOutput.Placement.AvailabilityZone)
event.RootFields.Put("cloud.region", regionName)
event.RootFields.Put("cloud.instance.id", instanceID)
event.RootFields.Put("cloud.machine.type", machineType)

// AWS EC2 Metrics
mapOfMetricSetFieldResults := make(map[string]interface{})

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(getMetricDataResults)
if !timestamp.IsZero() {
Expand All @@ -177,56 +171,69 @@ func createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult,
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, " ")
instanceID := labels[0]
machineType, err := instanceOutput[instanceID].InstanceType.MarshalValue()
if err != nil {
return events, info, errors.Wrap(err, "instance.InstanceType.MarshalValue failed")
}
events[instanceID].RootFields.Put("cloud.instance.id", instanceID)
events[instanceID].RootFields.Put("cloud.machine.type", machineType)
events[instanceID].RootFields.Put("cloud.availability_zone", *instanceOutput[instanceID].Placement.AvailabilityZone)

if len(output.Values) > timestampIdx {
mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx])
metricSetFieldResults[instanceID][labels[1]] = fmt.Sprint(output.Values[timestampIdx])
}
}
}
}

resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricSetFields)
if err != nil {
err = errors.Wrap(err, "Error trying to apply schema schemaMetricSetFields in AWS EC2 metricbeat module.")
return
}
instanceStateName, err := instanceOutput[instanceID].State.Name.MarshalValue()
if err != nil {
return events, info, errors.Wrap(err, "instance.State.Name.MarshalValue failed")
}

if len(mapOfMetricSetFieldResults) <= 11 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}
monitoringState, err := instanceOutput[instanceID].Monitoring.State.MarshalValue()
if err != nil {
return events, info, errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed")
}

instanceStateName, err := instanceOutput.State.Name.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.State.Name.MarshalValue failed")
return
}
events[instanceID].MetricSetFields.Put("instance.image.id", *instanceOutput[instanceID].ImageId)
events[instanceID].MetricSetFields.Put("instance.state.name", instanceStateName)
events[instanceID].MetricSetFields.Put("instance.state.code", *instanceOutput[instanceID].State.Code)
events[instanceID].MetricSetFields.Put("instance.monitoring.state", monitoringState)
events[instanceID].MetricSetFields.Put("instance.core.count", *instanceOutput[instanceID].CpuOptions.CoreCount)
events[instanceID].MetricSetFields.Put("instance.threads_per_core", *instanceOutput[instanceID].CpuOptions.ThreadsPerCore)
publicIP := instanceOutput[instanceID].PublicIpAddress
if publicIP != nil {
events[instanceID].MetricSetFields.Put("instance.public.ip", *publicIP)
}

monitoringState, err := instanceOutput.Monitoring.State.MarshalValue()
if err != nil {
err = errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed")
return
}
events[instanceID].MetricSetFields.Put("instance.public.dns_name", *instanceOutput[instanceID].PublicDnsName)
events[instanceID].MetricSetFields.Put("instance.private.dns_name", *instanceOutput[instanceID].PrivateDnsName)
privateIP := instanceOutput[instanceID].PrivateIpAddress
if privateIP != nil {
events[instanceID].MetricSetFields.Put("instance.private.ip", *privateIP)
}

resultMetricSetFields.Put("instance.image.id", *instanceOutput.ImageId)
resultMetricSetFields.Put("instance.state.name", instanceStateName)
resultMetricSetFields.Put("instance.state.code", *instanceOutput.State.Code)
resultMetricSetFields.Put("instance.monitoring.state", monitoringState)
resultMetricSetFields.Put("instance.core.count", *instanceOutput.CpuOptions.CoreCount)
resultMetricSetFields.Put("instance.threads_per_core", *instanceOutput.CpuOptions.ThreadsPerCore)
publicIP := instanceOutput.PublicIpAddress
if publicIP != nil {
resultMetricSetFields.Put("instance.public.ip", *publicIP)
}

}
}
resultMetricSetFields.Put("instance.public.dns_name", *instanceOutput.PublicDnsName)
resultMetricSetFields.Put("instance.private.dns_name", *instanceOutput.PrivateDnsName)
privateIP := instanceOutput.PrivateIpAddress
if privateIP != nil {
resultMetricSetFields.Put("instance.private.ip", *privateIP)

for instanceID, metricSetFieldsPerInstance := range metricSetFieldResults {
if len(metricSetFieldsPerInstance) != 0 {
resultMetricsetFields, err := aws.EventMapping(metricSetFieldsPerInstance, schemaMetricSetFields)
if err != nil {
return events, info, errors.Wrap(err, "EventMapping failed")
}

events[instanceID].MetricSetFields.Update(resultMetricsetFields)
if len(events[instanceID].MetricSetFields) < 5 {
info = "Missing Cloudwatch data for instance " + instanceID + ". This is expected for non-running instances or " +
"a new instance during the first data collection. If this shows up multiple times, please recheck the period " +
"setting in config."
}
}
}

event.MetricSetFields = resultMetricSetFields
return
return events, info, nil
}

func getInstancesPerRegion(svc ec2iface.EC2API) (instanceIDs []string, instancesOutputs map[string]ec2.Instance, err error) {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ func TestCreateCloudWatchEvents(t *testing.T) {
},
}

event, _, err := createCloudWatchEvents(getMetricDataOutput, instanceID, instancesOutputs[instanceID], mockModuleConfig.DefaultRegion)
events, _, err := createCloudWatchEvents(getMetricDataOutput, instancesOutputs, mockModuleConfig.DefaultRegion)
assert.NoError(t, err)
assert.Equal(t, expectedEvent.RootFields, event.RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], event.MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], event.MetricSetFields["instance"])
assert.Equal(t, 1, len(events))
assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"])
}

func TestConstructMetricQueries(t *testing.T) {
Expand Down