Skip to content

Commit

Permalink
loki: add feature flag to enable dataplane-compatible metric frames
Browse files Browse the repository at this point in the history
  • Loading branch information
gabor committed Apr 13, 2023
1 parent 8e03bad commit c19256b
Show file tree
Hide file tree
Showing 36 changed files with 2,333 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Alpha features might be changed or removed without prior notice.
| `influxdbBackendMigration` | Query InfluxDB InfluxQL without the proxy |
| `clientTokenRotation` | Replaces the current in-request token rotation so that the client initiates the rotation |
| `prometheusDataplane` | Changes responses to from Prometheus to be compliant with the dataplane specification. In particular it sets the numeric Field.Name from 'Value' to the value of the `__name__` label when present. |
| `lokiMetricDataplane` | Changes responses from Loki to be compliant with the dataplane specification. |
| `alertStateHistoryLokiSecondary` | Enable Grafana to write alert state history to an external Loki instance in addition to Grafana annotations. |
| `alertStateHistoryLokiPrimary` | Enable a remote Loki instance as the primary source for state history reads. |
| `alertStateHistoryLokiOnly` | Disable Grafana alerts from emitting annotations when a remote Loki instance is available. |
Expand Down
1 change: 1 addition & 0 deletions packages/grafana-data/src/types/featureToggles.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export interface FeatureToggles {
influxdbBackendMigration?: boolean;
clientTokenRotation?: boolean;
prometheusDataplane?: boolean;
lokiMetricDataplane?: boolean;
alertStateHistoryLokiSecondary?: boolean;
alertStateHistoryLokiPrimary?: boolean;
alertStateHistoryLokiOnly?: boolean;
Expand Down
6 changes: 6 additions & 0 deletions pkg/services/featuremgmt/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ var (
State: FeatureStateAlpha,
Owner: grafanaObservabilityMetricsSquad,
},
{
Name: "lokiMetricDataplane",
Description: "Changes responses from Loki to be compliant with the dataplane specification.",
State: FeatureStateAlpha,
Owner: grafanaObservabilityLogsSquad,
},
{
Name: "alertStateHistoryLokiSecondary",
Description: "Enable Grafana to write alert state history to an external Loki instance in addition to Grafana annotations.",
Expand Down
1 change: 1 addition & 0 deletions pkg/services/featuremgmt/toggles_gen.csv
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ prometheusResourceBrowserCache,alpha,@grafana/observability-metrics,false,false,
influxdbBackendMigration,alpha,@grafana/observability-metrics,false,false,false,true
clientTokenRotation,alpha,@grafana/grafana-authnz-team,false,false,false,false
prometheusDataplane,alpha,@grafana/observability-metrics,false,false,false,false
lokiMetricDataplane,alpha,@grafana/observability-logs,false,false,false,false
alertStateHistoryLokiSecondary,alpha,@grafana/alerting-squad,false,false,false,false
alertStateHistoryLokiPrimary,alpha,@grafana/alerting-squad,false,false,false,false
alertStateHistoryLokiOnly,alpha,@grafana/alerting-squad,false,false,false,false
Expand Down
4 changes: 4 additions & 0 deletions pkg/services/featuremgmt/toggles_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ const (
// Changes responses to from Prometheus to be compliant with the dataplane specification. In particular it sets the numeric Field.Name from 'Value' to the value of the `__name__` label when present.
FlagPrometheusDataplane = "prometheusDataplane"

// FlagLokiMetricDataplane
// Changes responses from Loki to be compliant with the dataplane specification.
FlagLokiMetricDataplane = "lokiMetricDataplane"

// FlagAlertStateHistoryLokiSecondary
// Enable Grafana to write alert state history to an external Loki instance in addition to Grafana annotations.
FlagAlertStateHistoryLokiSecondary = "alertStateHistoryLokiSecondary"
Expand Down
4 changes: 2 additions & 2 deletions pkg/tsdb/loki/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func readLokiError(body io.ReadCloser) error {
return makeLokiError(bytes)
}

func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery) (data.Frames, error) {
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) (data.Frames, error) {
req, err := makeDataRequest(ctx, api.url, query)
if err != nil {
return nil, err
Expand All @@ -170,7 +170,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery) (data.Frames
}

iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
res := converter.ReadPrometheusStyleResult(iter, converter.Options{MatrixWideSeries: false, VectorWideSeries: false})
res := converter.ReadPrometheusStyleResult(iter, converter.Options{MatrixWideSeries: false, VectorWideSeries: false, Dataplane: responseOpts.metricDataplane})

if res.Error != nil {
return nil, res.Error
Expand Down
12 changes: 6 additions & 6 deletions pkg/tsdb/loki/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestApiLogVolume(t *testing.T) {
require.Equal(t, "Source=logvolhist", req.Header.Get("X-Query-Tags"))
})

_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand All @@ -40,7 +40,7 @@ func TestApiLogVolume(t *testing.T) {
require.Equal(t, "Source=logsample", req.Header.Get("X-Query-Tags"))
})

_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand All @@ -52,7 +52,7 @@ func TestApiLogVolume(t *testing.T) {
require.Equal(t, "Source=datasample", req.Header.Get("X-Query-Tags"))
})

_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryDataSample, QueryType: QueryTypeRange})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryDataSample, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand All @@ -64,7 +64,7 @@ func TestApiLogVolume(t *testing.T) {
require.Equal(t, "", req.Header.Get("X-Query-Tags"))
})

_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryNone, QueryType: QueryTypeRange})
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryNone, QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestApiUrlHandling(t *testing.T) {
QueryType: QueryTypeRange,
}

_, err := api.DataQuery(context.Background(), query)
_, err := api.DataQuery(context.Background(), query, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand All @@ -154,7 +154,7 @@ func TestApiUrlHandling(t *testing.T) {
QueryType: QueryTypeInstant,
}

_, err := api.DataQuery(context.Background(), query)
_, err := api.DataQuery(context.Background(), query, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
})
Expand Down
10 changes: 6 additions & 4 deletions pkg/tsdb/loki/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// we adjust the dataframes to be the way frontend & alerting
// wants them.
func adjustFrame(frame *data.Frame, query *lokiQuery) error {
func adjustFrame(frame *data.Frame, query *lokiQuery, setMetricFrameName bool) error {
fields := frame.Fields

if len(fields) < 2 {
Expand All @@ -25,13 +25,13 @@ func adjustFrame(frame *data.Frame, query *lokiQuery) error {
secondField := fields[1]

if secondField.Type() == data.FieldTypeFloat64 {
return adjustMetricFrame(frame, query)
return adjustMetricFrame(frame, query, setMetricFrameName)
} else {
return adjustLogsFrame(frame, query)
}
}

func adjustMetricFrame(frame *data.Frame, query *lokiQuery) error {
func adjustMetricFrame(frame *data.Frame, query *lokiQuery, setFrameName bool) error {
fields := frame.Fields
// we check if the fields are of correct type
if len(fields) != 2 {
Expand All @@ -50,7 +50,9 @@ func adjustMetricFrame(frame *data.Frame, query *lokiQuery) error {
isMetricRange := query.QueryType == QueryTypeRange

name := formatName(labels, query)
frame.Name = name
if setFrameName {
frame.Name = name
}

if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
Expand Down
132 changes: 85 additions & 47 deletions pkg/tsdb/loki/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,52 +50,68 @@ func TestAdjustFrame(t *testing.T) {
timeNs3 := strconv.FormatInt(time3.UnixNano(), 10)
timeNs4 := strconv.FormatInt(time4.UnixNano(), 10)

frame := data.NewFrame("",
data.NewField("__labels", nil, []json.RawMessage{
json.RawMessage(`{"level":"info"}`),
json.RawMessage(`{"level":"error"}`),
json.RawMessage(`{"level":"error"}`),
json.RawMessage(`{"level":"info"}`),
}),
data.NewField("Time", nil, []time.Time{
time1, time2, time3, time4,
}),
data.NewField("Line", nil, []string{"line1", "line2", "line2", "line3"}),
data.NewField("TS", nil, []string{
timeNs1, timeNs2, timeNs3, timeNs4,
}),
)
makeFrame := func() *data.Frame {
return data.NewFrame("",
data.NewField("__labels", nil, []json.RawMessage{
json.RawMessage(`{"level":"info"}`),
json.RawMessage(`{"level":"error"}`),
json.RawMessage(`{"level":"error"}`),
json.RawMessage(`{"level":"info"}`),
}),
data.NewField("Time", nil, []time.Time{
time1, time2, time3, time4,
}),
data.NewField("Line", nil, []string{"line1", "line2", "line2", "line3"}),
data.NewField("TS", nil, []string{
timeNs1, timeNs2, timeNs3, timeNs4,
}),
)
}

query := &lokiQuery{
Expr: `{type="important"}`,
QueryType: QueryTypeRange,
RefID: "A",
}

err := adjustFrame(frame, query)
require.NoError(t, err)
verifyFrame := func(frame *data.Frame) {
fields := frame.Fields

require.Equal(t, 5, len(fields))

fields := frame.Fields
idField := fields[4]
require.Equal(t, "id", idField.Name)
require.Equal(t, data.FieldTypeString, idField.Type())
require.Equal(t, 4, idField.Len())
require.Equal(t, "1641092645000000006_a36f4e1b_A", idField.At(0))
require.Equal(t, "1641092705000000006_1d77c9ca_A", idField.At(1))
require.Equal(t, "1641092705000000006_1d77c9ca_1_A", idField.At(2))
require.Equal(t, "1641092765000000006_948c1a7d_A", idField.At(3))
}

frame := makeFrame()

require.Equal(t, 5, len(fields))
err := adjustFrame(frame, query, true)
require.NoError(t, err)
verifyFrame(frame)

idField := fields[4]
require.Equal(t, "id", idField.Name)
require.Equal(t, data.FieldTypeString, idField.Type())
require.Equal(t, 4, idField.Len())
require.Equal(t, "1641092645000000006_a36f4e1b_A", idField.At(0))
require.Equal(t, "1641092705000000006_1d77c9ca_A", idField.At(1))
require.Equal(t, "1641092705000000006_1d77c9ca_1_A", idField.At(2))
require.Equal(t, "1641092765000000006_948c1a7d_A", idField.At(3))
frame = makeFrame() // we need to reset the frame, because adjustFrame mutates it
err = adjustFrame(frame, query, false)
require.NoError(t, err)
verifyFrame(frame)
})

t.Run("naming inside metric fields should be correct", func(t *testing.T) {
field1 := data.NewField("", nil, make([]time.Time, 0))
field2 := data.NewField("", nil, make([]float64, 0))
field2.Labels = data.Labels{"app": "Application", "tag2": "tag2"}

frame := data.NewFrame("test", field1, field2)
frame.SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
makeFrame := func() *data.Frame {
field1 := data.NewField("", nil, make([]time.Time, 0))
field2 := data.NewField("", nil, make([]float64, 0))
field2.Labels = data.Labels{"app": "Application", "tag2": "tag2"}

frame := data.NewFrame("", field1, field2)
frame.SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
return frame
}

query := &lokiQuery{
Expr: "up(ALERTS)",
Expand All @@ -104,13 +120,23 @@ func TestAdjustFrame(t *testing.T) {
Step: time.Second * 42,
}

err := adjustFrame(frame, query)
frame := makeFrame()
err := adjustFrame(frame, query, true)
require.NoError(t, err)

require.Equal(t, frame.Name, "legend Application")
require.Equal(t, frame.Meta.ExecutedQueryString, "Expr: up(ALERTS)\nStep: 42s")
require.Equal(t, frame.Fields[0].Config.Interval, float64(42000))
require.Equal(t, frame.Fields[1].Config.DisplayNameFromDS, "legend Application")

frame = makeFrame()
err = adjustFrame(frame, query, false)
require.NoError(t, err)

require.Equal(t, frame.Name, "")
require.Equal(t, frame.Meta.ExecutedQueryString, "Expr: up(ALERTS)\nStep: 42s")
require.Equal(t, frame.Fields[0].Config.Interval, float64(42000))
require.Equal(t, frame.Fields[1].Config.DisplayNameFromDS, "legend Application")
})

t.Run("should set interval-attribute in response", func(t *testing.T) {
Expand All @@ -119,24 +145,36 @@ func TestAdjustFrame(t *testing.T) {
QueryType: QueryTypeRange,
}

field1 := data.NewField("", nil, make([]time.Time, 0))
field2 := data.NewField("", nil, make([]float64, 0))
makeFrame := func() *data.Frame {
field1 := data.NewField("", nil, make([]time.Time, 0))
field2 := data.NewField("", nil, make([]float64, 0))

frame := data.NewFrame("test", field1, field2)
frame.SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
frame := data.NewFrame("", field1, field2)
frame.SetMeta(&data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti})
return frame
}

err := adjustFrame(frame, query)
require.NoError(t, err)
verifyFrame := func(frame *data.Frame) {
// to keep the test simple, we assume the
// first field is the time-field
timeField := frame.Fields[0]
require.NotNil(t, timeField)
require.Equal(t, data.FieldTypeTime, timeField.Type())

// to keep the test simple, we assume the
// first field is the time-field
timeField := frame.Fields[0]
require.NotNil(t, timeField)
require.Equal(t, data.FieldTypeTime, timeField.Type())
timeFieldConfig := timeField.Config
require.NotNil(t, timeFieldConfig)
require.Equal(t, float64(42000), timeFieldConfig.Interval)
}

frame := makeFrame()

timeFieldConfig := timeField.Config
require.NotNil(t, timeFieldConfig)
require.Equal(t, float64(42000), timeFieldConfig.Interval)
err := adjustFrame(frame, query, true)
require.NoError(t, err)
verifyFrame(frame)

err = adjustFrame(frame, query, false)
require.NoError(t, err)
verifyFrame(frame)
})

t.Run("should parse response stats", func(t *testing.T) {
Expand Down
37 changes: 21 additions & 16 deletions pkg/tsdb/loki/framing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,28 @@ func TestSuccessResponse(t *testing.T) {
{name: "parse an empty response", filepath: "empty", query: matrixQuery},
}

runTest := func(folder string, path string, query lokiQuery, responseOpts ResponseOpts) {
responseFileName := filepath.Join(folder, path+".json")
goldenFileName := path + ".golden"

//nolint:gosec
bytes, err := os.ReadFile(responseFileName)
require.NoError(t, err)

frames, err := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &query, responseOpts)
require.NoError(t, err)

dr := &backend.DataResponse{
Frames: frames,
Error: err,
}
experimental.CheckGoldenJSONResponse(t, folder, goldenFileName, dr, true)
}

for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
responseFileName := filepath.Join("testdata", test.filepath+".json")
goldenFileName := test.filepath + ".golden"

//nolint:gosec
bytes, err := os.ReadFile(responseFileName)
require.NoError(t, err)

frames, err := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &test.query)
require.NoError(t, err)

dr := &backend.DataResponse{
Frames: frames,
Error: err,
}
experimental.CheckGoldenJSONResponse(t, "testdata", goldenFileName, dr, true)
runTest("testdata", test.filepath, test.query, ResponseOpts{metricDataplane: false})
runTest("testdata_metric_dataplane", test.filepath, test.query, ResponseOpts{metricDataplane: true})
})
}
}
Expand Down Expand Up @@ -118,7 +123,7 @@ func TestErrorResponse(t *testing.T) {

for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward})
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}, ResponseOpts{})

require.Len(t, frames, 0)
require.Error(t, err)
Expand Down
Loading

0 comments on commit c19256b

Please sign in to comment.