Skip to content

Commit

Permalink
feat(server): add traceDB endpoints to event log (#2516)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgeepc authored May 19, 2023
1 parent efb642f commit 607e502
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 3 deletions.
4 changes: 3 additions & 1 deletion server/executor/poller_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, s
}
}

err = pe.eventEmitter.Emit(request.ctx, events.TracePollingStart(request.test.ID, request.run.ID))
endpoints := traceDB.GetEndpoints()
ds, err := pe.dsRepo.Current(request.ctx)
err = pe.eventEmitter.Emit(request.ctx, events.TracePollingStart(request.test.ID, request.run.ID, string(ds.Type), endpoints))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions server/executor/poller_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ func (db *traceDBMock) GetTraceID() trace.TraceID {
func (db *traceDBMock) Connect(ctx context.Context) error { return nil }
func (db *traceDBMock) Close() error { return nil }
func (db *traceDBMock) Ready() bool { return true }
func (db *traceDBMock) GetEndpoints() string { return "" }
func (db *traceDBMock) TestConnection(ctx context.Context) model.ConnectionResult {
return model.ConnectionResult{}
}
Expand Down
8 changes: 6 additions & 2 deletions server/model/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,18 @@ func TraceDataStoreConnectionInfo(testID id.ID, runID int, connectionResult mode
}
}

func TracePollingStart(testID id.ID, runID int) model.TestRunEvent {
func TracePollingStart(testID id.ID, runID int, dsType, endpoints string) model.TestRunEvent {
endpointsDescription := ""
if endpoints != "" {
endpointsDescription = fmt.Sprintf(" with the following endpoints: %s", endpoints)
}
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrace,
Type: "POLLING_START",
Title: "Trace polling started",
Description: "The trace polling process has started",
Description: fmt.Sprintf("The trace polling process has started using %s %s", dsType, endpointsDescription),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/awsxray.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (db *awsxrayDB) Close() error {
return nil
}

func (db *awsxrayDB) GetEndpoints() string {
return fmt.Sprintf("xray.%s.amazonaws.com:443", db.region)
}

func (db *awsxrayDB) TestConnection(ctx context.Context) model.ConnectionResult {
url := fmt.Sprintf("xray.%s.amazonaws.com:443", db.region)
tester := connection.NewTester(
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/elasticsearchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (db *elasticsearchDB) Close() error {
return nil
}

func (db *elasticsearchDB) GetEndpoints() string {
return strings.Join(db.config.Addresses, ", ")
}

func (db *elasticsearchDB) TestConnection(ctx context.Context) model.ConnectionResult {
tester := connection.NewTester(
connection.WithPortLintingTest(connection.PortLinter("ElasticSearch", elasticSearchDefaultPorts(), db.config.Addresses...)),
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/jaegerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (jtd *jaegerTraceDB) Connect(ctx context.Context) error {
return jtd.dataSource.Connect(ctx)
}

func (jtd *jaegerTraceDB) GetEndpoints() string {
return jtd.dataSource.Endpoint()
}

func (jtd *jaegerTraceDB) TestConnection(ctx context.Context) model.ConnectionResult {
tester := connection.NewTester(
connection.WithPortLintingTest(connection.PortLinter("Jaeger", jaegerDefaultPorts(), jtd.dataSource.Endpoint())),
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/opensearchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (db *opensearchDB) Close() error {
return nil
}

func (db *opensearchDB) GetEndpoints() string {
return strings.Join(db.config.Addresses, ", ")
}

func (db *opensearchDB) TestConnection(ctx context.Context) model.ConnectionResult {
tester := connection.NewTester(
connection.WithPortLintingTest(connection.PortLinter("OpenSearch", opensearchDefaultPorts(), db.config.Addresses...)),
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (tdb *OTLPTraceDB) Close() error {
return nil
}

func (tdb *OTLPTraceDB) GetEndpoints() string {
return ""
}

// GetTraceByID implements TraceDB
func (tdb *OTLPTraceDB) GetTraceByID(ctx context.Context, id string) (model.Trace, error) {
run, err := tdb.db.GetRunByTraceID(ctx, traces.DecodeTraceID(id))
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/signalfxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (db *signalfxDB) Close() error {
return nil
}

func (db *signalfxDB) GetEndpoints() string {
return fmt.Sprintf("%s:%s", db.getURL(), "443")
}

func (db *signalfxDB) TestConnection(ctx context.Context) model.ConnectionResult {
url := fmt.Sprintf("%s:%s", db.getURL(), "443")
tester := connection.NewTester(
Expand Down
4 changes: 4 additions & 0 deletions server/tracedb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (tdb *tempoTraceDB) Connect(ctx context.Context) error {
return tdb.dataSource.Connect(ctx)
}

func (ttd *tempoTraceDB) GetEndpoints() string {
return ttd.dataSource.Endpoint()
}

func (ttd *tempoTraceDB) TestConnection(ctx context.Context) model.ConnectionResult {
tester := connection.NewTester(
connection.WithPortLintingTest(connection.PortLinter("Tempo", tempoDefaultPorts(), ttd.dataSource.Endpoint())),
Expand Down
2 changes: 2 additions & 0 deletions server/tracedb/tracedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type TraceDB interface {
GetTraceID() trace.TraceID
GetTraceByID(ctx context.Context, traceID string) (model.Trace, error)
Close() error
GetEndpoints() string
}

type TestableTraceDB interface {
Expand All @@ -39,6 +40,7 @@ func (db *noopTraceDB) Connect(ctx context.Context) error { return nil }
func (db *noopTraceDB) Close() error { return nil }
func (db *noopTraceDB) ShouldRetry() bool { return false }
func (db *noopTraceDB) Ready() bool { return true }
func (db *noopTraceDB) GetEndpoints() string { return "" }
func (db *noopTraceDB) TestConnection(ctx context.Context) model.ConnectionResult {
return model.ConnectionResult{}
}
Expand Down

0 comments on commit 607e502

Please sign in to comment.