diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk.go b/metricbeat/module/ceph/cluster_disk/cluster_disk.go index cc179e714409..37f68f37a6cb 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk.go @@ -18,7 +18,6 @@ package cluster_disk import ( - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -61,12 +60,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }, nil } -func (m *MetricSet) Fetch() (common.MapStr, error) { +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() + if err != nil { + return err + } + event, err := eventMapping(content) if err != nil { - return nil, err + return err + } + + if reported := reporter.Event(mb.Event{MetricSetFields: event}); !reported { + m.Logger().Debug("error reporting event") } - return eventMapping(content), nil + return nil } diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index 29253ee93830..8b815dca0f7b 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// +build integration + package cluster_disk import ( @@ -26,9 +28,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) - if err != nil { + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) + + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go index 3aef094ad2a5..b073f4be6b67 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go @@ -50,8 +50,13 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewEventFetcher(t, config) - event, err := f.Fetch() + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) diff --git a/metricbeat/module/ceph/cluster_disk/data.go b/metricbeat/module/ceph/cluster_disk/data.go index 7a2c600cc539..259d712f8c9e 100644 --- a/metricbeat/module/ceph/cluster_disk/data.go +++ b/metricbeat/module/ceph/cluster_disk/data.go @@ -20,8 +20,9 @@ package cluster_disk import ( "encoding/json" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) type StatsCluster struct { @@ -39,11 +40,11 @@ type DfRequest struct { Output Output `json:"output"` } -func eventMapping(content []byte) common.MapStr { +func eventMapping(content []byte) (common.MapStr, error) { var d DfRequest err := json.Unmarshal(content, &d) if err != nil { - logp.Err("Error: %+v", err) + return nil, errors.Wrap(err, "could not get DFRequest data") } return common.MapStr{ @@ -56,5 +57,5 @@ func eventMapping(content []byte) common.MapStr { "available": common.MapStr{ "bytes": d.Output.StatsCluster.TotalAvailBytes, }, - } + }, nil }