Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Remove EventFetcher and EventsFetcher interface #11762

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The custom beat generator now uses mage instead of python, `mage GenerateCustomBeat` can be used to create a new beat, and `mage vendorUpdate` to update the vendored libbeat in a custom beat. {pull}13610[13610]
- Altered all remaining uses of mapval to use the renamed and enhanced version: https://github.com/elastic/go-lookslike[go-lookslike] instead, which is a separate project. The mapval tree is now gone. {pull}14165[14165]
- Move light modules to OSS. {pull}14369[14369]
- Removing Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}[]

==== Bugfixes

Expand Down
82 changes: 0 additions & 82 deletions metricbeat/helper/prometheus/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"

Expand All @@ -47,87 +46,6 @@ type TestCases []struct {
ExpectedFile string
}

// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted
// into the expected events when passed by the given metricset.
// If -update_expected flag is passed, the expected JSON file will be updated with the result
func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) {
for _, test := range cases {
t.Logf("Testing %s file\n", test.MetricsFile)

file, err := os.Open(test.MetricsFile)
assert.NoError(t, err, "cannot open test file "+test.MetricsFile)

body, err := ioutil.ReadAll(file)
assert.NoError(t, err, "cannot read test file "+test.MetricsFile)

server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1")
w.Write([]byte(body))
}))

server.Start()
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.Nil(t, err, "Errors while fetching metrics")

if *expectedFlag {
sort.SliceStable(events, func(i, j int) bool {
h1, _ := hashstructure.Hash(events[i], nil)
h2, _ := hashstructure.Hash(events[j], nil)
return h1 < h2
})
eventsJSON, _ := json.MarshalIndent(events, "", "\t")
err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644)
assert.NoError(t, err)
}

// Read expected events from reference file
expected, err := ioutil.ReadFile(test.ExpectedFile)
if err != nil {
t.Fatal(err)
}

var expectedEvents []common.MapStr
err = json.Unmarshal(expected, &expectedEvents)
if err != nil {
t.Fatal(err)
}

for _, event := range events {
// ensure the event is in expected list
found := -1
for i, expectedEvent := range expectedEvents {
if event.String() == expectedEvent.String() {
found = i
break
}
}
if found > -1 {
expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...)
} else {
t.Errorf("Event was not expected: %+v", event)
}
}

if len(expectedEvents) > 0 {
t.Error("Some events were missing:")
for _, e := range expectedEvents {
t.Error(e)
}
t.Fatal()
}
}
}

// TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected
// events when passed by the given metricset.
// If -update_expected flag is passed, the expected JSON file will be updated with the result
Expand Down
8 changes: 0 additions & 8 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error {
// of them.
func mustImplementFetcher(ms MetricSet) error {
var ifcs []string
if _, ok := ms.(EventFetcher); ok {
ifcs = append(ifcs, "EventFetcher")
}

if _, ok := ms.(EventsFetcher); ok {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}
Expand Down
14 changes: 0 additions & 14 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,6 @@ type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
Expand Down
54 changes: 4 additions & 50 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,14 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher
// FetchReporter

type testMetricSet struct {
BaseMetricSet
}

func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
func (m *testMetricSet) Fetch(reporter ReporterV2) error {
return nil
}

// ReportingFetcher
Expand Down Expand Up @@ -246,46 +236,10 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
r := newTestRegistry(t)

factory := func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventsFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetReportingFetcher{base}, nil
}

name = "ReportingFetcher"
name := "ReportingFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}
Expand Down
26 changes: 1 addition & 25 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V2())
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
msw.startPeriodicFetching(&channelContext{done}, reporter)
default:
// Earlier startup stages prevent this from happening.
logp.Err("MetricSet '%s/%s' does not implement an event producing interface",
Expand All @@ -209,6 +206,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
// startPeriodicFetching performs an immediate fetch for the MetricSet then it
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
// TODO: remove?
func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
// Indicate that it has been started as periodic fetcher
msw.periodic = true
Expand All @@ -234,10 +232,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
case mb.EventsFetcher:
msw.multiEventFetch(fetcher, reporter)
case mb.ReportingMetricSet:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V1())
Expand All @@ -263,24 +257,6 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
}
}

func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) {
reporter.StartFetchTimer()
event, err := fetcher.Fetch()
reporter.V1().ErrorWith(err, event)
}

func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) {
reporter.StartFetchTimer()
events, err := fetcher.Fetch()
if len(events) == 0 {
reporter.V1().ErrorWith(err, nil)
} else {
for _, event := range events {
reporter.V1().ErrorWith(err, event)
}
}
}

// close closes the underlying MetricSet if it implements the mb.Closer
// interface.
func (msw *metricSetWrapper) close() error {
Expand Down
41 changes: 6 additions & 35 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ type fakeEventFetcher struct {
mb.BaseMetricSet
}

func (ms *fakeEventFetcher) Fetch() (common.MapStr, error) {
func (ms *fakeEventFetcher) Fetch(reporter mb.ReporterV2) error {
t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z")
return common.MapStr{"@timestamp": common.Time(t), "metric": 1}, nil
reporter.Event(mb.Event{
Timestamp: t,
RootFields: common.MapStr{"metric": 1},
})
return nil
}

func (ms *fakeEventFetcher) Close() error {
Expand Down Expand Up @@ -130,39 +134,6 @@ func newConfig(t testing.TB, moduleConfig interface{}) *common.Config {

// test cases

func TestWrapperOfEventFetcher(t *testing.T) {
hosts := []string{"alpha", "beta"}
c := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
output := m.Start(done)

<-output
<-output
close(done)

// Validate that the channel is closed after receiving the two
// initial events.
select {
case _, ok := <-output:
if !ok {
// Channel is closed.
return
} else {
assert.Fail(t, "received unexpected event")
}
}
}

func TestWrapperOfReportingFetcher(t *testing.T) {
hosts := []string{"alpha", "beta"}
c := newConfig(t, map[string]interface{}{
Expand Down
50 changes: 0 additions & 50 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,56 +37,6 @@ var (
dataFlag = flag.Bool("data", false, "Write updated data.json files")
)

// WriteEvent fetches a single event writes the output to a ./_meta/data.json
// file.
func WriteEvent(f mb.EventFetcher, t testing.TB) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

event, err := f.Fetch()
if err != nil {
return err
}

fullEvent := CreateFullEvent(f, event)
WriteEventToDataJSON(t, fullEvent, ".")
return nil
}

// WriteEvents fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEvents(f mb.EventsFetcher, t testing.TB) error {
return WriteEventsCond(f, t, nil)

}

// WriteEventsCond fetches events and writes the first event that matches the condition
// to a ./_meta/data.json file.
func WriteEventsCond(f mb.EventsFetcher, t testing.TB, cond func(e common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, err := f.Fetch()
if err != nil {
return err
}

if len(events) == 0 {
return fmt.Errorf("no events were generated")
}

event, err := SelectEvent(events, cond)
if err != nil {
return err
}

fullEvent := CreateFullEvent(f, event)
WriteEventToDataJSON(t, fullEvent, "")
return nil
}

// WriteEventsReporterV2 fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) error {
Expand Down
Loading