diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 551ae92a3cb5..ac31d48d9707 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -27,6 +27,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - The custom beat generator now uses mage instead of python, `mage GenerateCustomBeat` can be used to create a new beat, and `mage vendorUpdate` to update the vendored libbeat in a custom beat. {pull}13610[13610] - Altered all remaining uses of mapval to use the renamed and enhanced version: https://github.com/elastic/go-lookslike[go-lookslike] instead, which is a separate project. The mapval tree is now gone. {pull}14165[14165] - Move light modules to OSS. {pull}14369[14369] +- Removing Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}[] ==== Bugfixes diff --git a/metricbeat/helper/prometheus/ptest/ptest.go b/metricbeat/helper/prometheus/ptest/ptest.go index 4bcadbc5f671..1972a09f9cd7 100644 --- a/metricbeat/helper/prometheus/ptest/ptest.go +++ b/metricbeat/helper/prometheus/ptest/ptest.go @@ -29,7 +29,6 @@ import ( "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" @@ -47,87 +46,6 @@ type TestCases []struct { ExpectedFile string } -// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted -// into the expected events when passed by the given metricset. -// If -update_expected flag is passed, the expected JSON file will be updated with the result -func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) { - for _, test := range cases { - t.Logf("Testing %s file\n", test.MetricsFile) - - file, err := os.Open(test.MetricsFile) - assert.NoError(t, err, "cannot open test file "+test.MetricsFile) - - body, err := ioutil.ReadAll(file) - assert.NoError(t, err, "cannot read test file "+test.MetricsFile) - - server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1") - w.Write([]byte(body)) - })) - - server.Start() - defer server.Close() - - config := map[string]interface{}{ - "module": module, - "metricsets": []string{metricset}, - "hosts": []string{server.URL}, - } - - f := mbtest.NewEventsFetcher(t, config) - events, err := f.Fetch() - assert.Nil(t, err, "Errors while fetching metrics") - - if *expectedFlag { - sort.SliceStable(events, func(i, j int) bool { - h1, _ := hashstructure.Hash(events[i], nil) - h2, _ := hashstructure.Hash(events[j], nil) - return h1 < h2 - }) - eventsJSON, _ := json.MarshalIndent(events, "", "\t") - err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644) - assert.NoError(t, err) - } - - // Read expected events from reference file - expected, err := ioutil.ReadFile(test.ExpectedFile) - if err != nil { - t.Fatal(err) - } - - var expectedEvents []common.MapStr - err = json.Unmarshal(expected, &expectedEvents) - if err != nil { - t.Fatal(err) - } - - for _, event := range events { - // ensure the event is in expected list - found := -1 - for i, expectedEvent := range expectedEvents { - if event.String() == expectedEvent.String() { - found = i - break - } - } - if found > -1 { - expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...) - } else { - t.Errorf("Event was not expected: %+v", event) - } - } - - if len(expectedEvents) > 0 { - t.Error("Some events were missing:") - for _, e := range expectedEvents { - t.Error(e) - } - t.Fatal() - } - } -} - // TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected // events when passed by the given metricset. // If -update_expected flag is passed, the expected JSON file will be updated with the result diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index 04b32f5638aa..f09b439b90ac 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error { // of them. func mustImplementFetcher(ms MetricSet) error { var ifcs []string - if _, ok := ms.(EventFetcher); ok { - ifcs = append(ifcs, "EventFetcher") - } - - if _, ok := ms.(EventsFetcher); ok { - ifcs = append(ifcs, "EventsFetcher") - } - if _, ok := ms.(ReportingMetricSet); ok { ifcs = append(ifcs, "ReportingMetricSet") } diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 24475e0e01a6..9b82be98f992 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -118,20 +118,6 @@ type Closer interface { Close() error } -// EventFetcher is a MetricSet that returns a single event when collecting data. -// Use ReportingMetricSet for new MetricSet implementations. -type EventFetcher interface { - MetricSet - Fetch() (common.MapStr, error) -} - -// EventsFetcher is a MetricSet that returns a multiple events when collecting -// data. Use ReportingMetricSet for new MetricSet implementations. -type EventsFetcher interface { - MetricSet - Fetch() ([]common.MapStr, error) -} - // Reporter is used by a MetricSet to report events, errors, or errors with // metadata. The methods return false if and only if publishing failed because // the MetricSet is being closed. diff --git a/metricbeat/mb/mb_test.go b/metricbeat/mb/mb_test.go index 3e3b21ea30a1..ba365f8b9233 100644 --- a/metricbeat/mb/mb_test.go +++ b/metricbeat/mb/mb_test.go @@ -37,24 +37,14 @@ func (m testModule) ParseHost(host string) (HostData, error) { return m.hostParser(host) } -// EventFetcher +// FetchReporter type testMetricSet struct { BaseMetricSet } -func (m *testMetricSet) Fetch() (common.MapStr, error) { - return nil, nil -} - -// EventsFetcher - -type testMetricSetEventsFetcher struct { - BaseMetricSet -} - -func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) { - return nil, nil +func (m *testMetricSet) Fetch(reporter ReporterV2) error { + return nil } // ReportingFetcher @@ -246,46 +236,10 @@ func TestNewModulesMetricSetTypes(t *testing.T) { r := newTestRegistry(t) factory := func(base BaseMetricSet) (MetricSet, error) { - return &testMetricSet{base}, nil - } - - name := "EventFetcher" - if err := r.AddMetricSet(moduleName, name, factory); err != nil { - t.Fatal(err) - } - - t.Run(name+" MetricSet", func(t *testing.T) { - ms := newTestMetricSet(t, r, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{name}, - }) - _, ok := ms.(EventFetcher) - assert.True(t, ok, name+" not implemented") - }) - - factory = func(base BaseMetricSet) (MetricSet, error) { - return &testMetricSetEventsFetcher{base}, nil - } - - name = "EventsFetcher" - if err := r.AddMetricSet(moduleName, name, factory); err != nil { - t.Fatal(err) - } - - t.Run(name+" MetricSet", func(t *testing.T) { - ms := newTestMetricSet(t, r, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{name}, - }) - _, ok := ms.(EventsFetcher) - assert.True(t, ok, name+" not implemented") - }) - - factory = func(base BaseMetricSet) (MetricSet, error) { return &testMetricSetReportingFetcher{base}, nil } - name = "ReportingFetcher" + name := "ReportingFetcher" if err := r.AddMetricSet(moduleName, name, factory); err != nil { t.Fatal(err) } diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index e0bd7d12dbda..6115a99fb0b5 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -196,9 +196,6 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { ms.Run(reporter.V2()) case mb.PushMetricSetV2WithContext: ms.Run(&channelContext{done}, reporter.V2()) - case mb.EventFetcher, mb.EventsFetcher, - mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: - msw.startPeriodicFetching(&channelContext{done}, reporter) default: // Earlier startup stages prevent this from happening. logp.Err("MetricSet '%s/%s' does not implement an event producing interface", @@ -209,6 +206,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { // startPeriodicFetching performs an immediate fetch for the MetricSet then it // begins a continuous timer scheduled loop to fetch data. To stop the loop the // done channel should be closed. +// TODO: remove? func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) { // Indicate that it has been started as periodic fetcher msw.periodic = true @@ -234,10 +232,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter // and log a stack track if one occurs. func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { switch fetcher := msw.MetricSet.(type) { - case mb.EventFetcher: - msw.singleEventFetch(fetcher, reporter) - case mb.EventsFetcher: - msw.multiEventFetch(fetcher, reporter) case mb.ReportingMetricSet: reporter.StartFetchTimer() fetcher.Fetch(reporter.V1()) @@ -263,24 +257,6 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { } } -func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) { - reporter.StartFetchTimer() - event, err := fetcher.Fetch() - reporter.V1().ErrorWith(err, event) -} - -func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) { - reporter.StartFetchTimer() - events, err := fetcher.Fetch() - if len(events) == 0 { - reporter.V1().ErrorWith(err, nil) - } else { - for _, event := range events { - reporter.V1().ErrorWith(err, event) - } - } -} - // close closes the underlying MetricSet if it implements the mb.Closer // interface. func (msw *metricSetWrapper) close() error { diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index 8901937fadc4..a9c766103ae6 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -57,9 +57,13 @@ type fakeEventFetcher struct { mb.BaseMetricSet } -func (ms *fakeEventFetcher) Fetch() (common.MapStr, error) { +func (ms *fakeEventFetcher) Fetch(reporter mb.ReporterV2) error { t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z") - return common.MapStr{"@timestamp": common.Time(t), "metric": 1}, nil + reporter.Event(mb.Event{ + Timestamp: t, + RootFields: common.MapStr{"metric": 1}, + }) + return nil } func (ms *fakeEventFetcher) Close() error { @@ -130,39 +134,6 @@ func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { // test cases -func TestWrapperOfEventFetcher(t *testing.T) { - hosts := []string{"alpha", "beta"} - c := newConfig(t, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{eventFetcherName}, - "hosts": hosts, - }) - - m, err := module.NewWrapper(c, newTestRegistry(t)) - if err != nil { - t.Fatal(err) - } - - done := make(chan struct{}) - output := m.Start(done) - - <-output - <-output - close(done) - - // Validate that the channel is closed after receiving the two - // initial events. - select { - case _, ok := <-output: - if !ok { - // Channel is closed. - return - } else { - assert.Fail(t, "received unexpected event") - } - } -} - func TestWrapperOfReportingFetcher(t *testing.T) { hosts := []string{"alpha", "beta"} c := newConfig(t, map[string]interface{}{ diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 7314bfc79e59..e527bd993363 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -37,56 +37,6 @@ var ( dataFlag = flag.Bool("data", false, "Write updated data.json files") ) -// WriteEvent fetches a single event writes the output to a ./_meta/data.json -// file. -func WriteEvent(f mb.EventFetcher, t testing.TB) error { - if !*dataFlag { - t.Skip("skip data generation tests") - } - - event, err := f.Fetch() - if err != nil { - return err - } - - fullEvent := CreateFullEvent(f, event) - WriteEventToDataJSON(t, fullEvent, ".") - return nil -} - -// WriteEvents fetches events and writes the first event to a ./_meta/data.json -// file. -func WriteEvents(f mb.EventsFetcher, t testing.TB) error { - return WriteEventsCond(f, t, nil) - -} - -// WriteEventsCond fetches events and writes the first event that matches the condition -// to a ./_meta/data.json file. -func WriteEventsCond(f mb.EventsFetcher, t testing.TB, cond func(e common.MapStr) bool) error { - if !*dataFlag { - t.Skip("skip data generation tests") - } - - events, err := f.Fetch() - if err != nil { - return err - } - - if len(events) == 0 { - return fmt.Errorf("no events were generated") - } - - event, err := SelectEvent(events, cond) - if err != nil { - return err - } - - fullEvent := CreateFullEvent(f, event) - WriteEventToDataJSON(t, fullEvent, "") - return nil -} - // WriteEventsReporterV2 fetches events and writes the first event to a ./_meta/data.json // file. func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) error { diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 12ff0e9a1add..2620651235dd 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -109,34 +109,6 @@ func NewMetricSet(t testing.TB, config interface{}) mb.MetricSet { return metricset } -// NewEventFetcher instantiates a new EventFetcher using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewEventFetcher(t testing.TB, config interface{}) mb.EventFetcher { - metricSet := NewMetricSet(t, config) - - fetcher, ok := metricSet.(mb.EventFetcher) - if !ok { - t.Fatal("MetricSet does not implement EventFetcher") - } - - return fetcher -} - -// NewEventsFetcher instantiates a new EventsFetcher using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewEventsFetcher(t testing.TB, config interface{}) mb.EventsFetcher { - metricSet := NewMetricSet(t, config) - - fetcher, ok := metricSet.(mb.EventsFetcher) - if !ok { - t.Fatal("MetricSet does not implement EventsFetcher") - } - - return fetcher -} - func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { metricSet := NewMetricSet(t, config)