From 8e69e5e49180d72b09efafbe9c4209e78931d27d Mon Sep 17 00:00:00 2001 From: b5 Date: Wed, 10 Feb 2021 15:46:04 -0500 Subject: [PATCH] feat(run): run package defines state of a transform run run.State is a passable, cachable data structure that represents the state of a run. We build one up as events are emitted & can use them for storage upon completion. --- event/event.go | 4 + transform/run/run.go | 168 ++++++++++++++++++++++++++++++++++++++ transform/run/run_test.go | 116 ++++++++++++++++++++++++++ 3 files changed, 288 insertions(+) create mode 100644 transform/run/run.go create mode 100644 transform/run/run_test.go diff --git a/event/event.go b/event/event.go index ae60270c9..c8c454b16 100644 --- a/event/event.go +++ b/event/event.go @@ -164,6 +164,10 @@ func (b *bus) publish(ctx context.Context, typ Type, sessionID string, payload i Payload: payload, } + if b.closed { + return e, ErrBusClosed + } + // TODO(dustmop): Add instrumentation, perhaps to ctx, to make logging / tracing // a single event easier to do. diff --git a/transform/run/run.go b/transform/run/run.go new file mode 100644 index 000000000..afdfffcc4 --- /dev/null +++ b/transform/run/run.go @@ -0,0 +1,168 @@ +// Package run defines metadata about transform script execution +package run + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/qri-io/qri/event" +) + +// NewID creates a run identifier +func NewID() string { + return uuid.New().String() +} + +// Status enumerates all possible execution states of a transform script or +// step within a script, in relation to the current time. +// Scripts & steps that have completed are broken into categories based on exit +// state +type Status string + +const ( + // RSWaiting indicates a script/step that has yet to start + RSWaiting = Status("waiting") + // RSRunning indicates a script/step is currently executing + RSRunning = Status("running") + // RSSucceeded indicates a script/step has completed without error + RSSucceeded = Status("succeeded") + // RSFailed indicates a script/step completed & exited when an unexpected error + // occured + RSFailed = Status("failed") + // RSUnchanged indicates a script completed but no changes were found + // since the last version of the script succeeded + RSUnchanged = Status("unchanged") + // RSSkipped indicates a script/step was not executed + RSSkipped = Status("skipped") +) + +// State is a passable, cachable data structure that describes the execution of +// a transform. State structs can act as a sink of transform events, collapsing +// the state transition of multiple transform events into a single structure +type State struct { + ID string `json:"id"` + Number int `json:"number"` + Status Status `json:"status"` + Message string `json:"message"` + StartTime *time.Time `json:"startTime"` + StopTime *time.Time `json:"stopTime"` + Duration int `json:"duration"` + Steps []*StepState `json:"steps"` +} + +// NewState is a simple constructor to remind package consumers that state +// structs must be initialized with an identifier to act as a sink of transform +// events +func NewState(id string) *State { + return &State{ + ID: id, + } +} + +// AddTransformEvent alters state based on a given event +func (rs *State) AddTransformEvent(e event.Event) error { + if rs.ID != e.SessionID { + // silently ignore session ID mismatch + return nil + } + + switch e.Type { + case event.ETTransformStart: + rs.Status = RSRunning + rs.StartTime = toTimePointer(e.Timestamp) + return nil + case event.ETTransformStop: + rs.StopTime = toTimePointer(e.Timestamp) + if tl, ok := e.Payload.(event.TransformLifecycle); ok { + rs.Status = Status(tl.Status) + } + if rs.StartTime != nil && rs.StopTime != nil { + rs.Duration = int(rs.StopTime.Sub(*rs.StartTime)) + } + return nil + case event.ETTransformStepStart: + s, err := NewStepStateFromEvent(e) + if err != nil { + return err + } + s.Status = RSRunning + s.StartTime = toTimePointer(e.Timestamp) + rs.Steps = append(rs.Steps, s) + return nil + case event.ETTransformStepStop: + step, err := rs.lastStep() + if err != nil { + return err + } + step.StopTime = toTimePointer(e.Timestamp) + if tsl, ok := e.Payload.(event.TransformStepLifecycle); ok { + step.Status = Status(tsl.Status) + } else { + step.Status = RSFailed + } + if step.StartTime != nil && step.StopTime != nil { + step.Duration = int(step.StopTime.Sub(*step.StartTime)) + } + return nil + case event.ETTransformStepSkip: + s, err := NewStepStateFromEvent(e) + if err != nil { + return err + } + s.Status = RSSkipped + rs.Steps = append(rs.Steps, s) + return nil + case event.ETTransformPrint, + event.ETTransformError, + event.ETTransformDatasetPreview: + return rs.appendStepOutputLog(e) + } + return fmt.Errorf("unexpected event type: %q", e.Type) +} + +func (rs *State) lastStep() (*StepState, error) { + if len(rs.Steps) > 0 { + return rs.Steps[len(rs.Steps)-1], nil + } + return nil, fmt.Errorf("expected step to exist") +} + +func (rs *State) appendStepOutputLog(e event.Event) error { + step, err := rs.lastStep() + if err != nil { + return err + } + + step.Output = append(step.Output, e) + return nil +} + +// StepState describes the execution of a transform step +type StepState struct { + Name string `json:"name"` + Category string `json:"category"` + Status Status `json:"status"` + StartTime *time.Time `json:"startTime"` + StopTime *time.Time `json:"stopTime"` + Duration int `json:"duration"` + Output []event.Event `json:"output"` +} + +// NewStepStateFromEvent constructs StepState from an event +func NewStepStateFromEvent(e event.Event) (*StepState, error) { + if tsl, ok := e.Payload.(event.TransformStepLifecycle); ok { + return &StepState{ + Name: tsl.Name, + Category: tsl.Category, + Status: Status(tsl.Status), + }, nil + } + return nil, fmt.Errorf("run step event data must be a transform step lifecycle struct") +} + +func toTimePointer(unixnano int64) *time.Time { + // TODO (b5) - we're dropping nanosecond precision here :/ + t := time.Unix(unixnano, 0) + return &t +} diff --git a/transform/run/run_test.go b/transform/run/run_test.go new file mode 100644 index 000000000..999515d0c --- /dev/null +++ b/transform/run/run_test.go @@ -0,0 +1,116 @@ +package run + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/qri-io/qri/event" +) + +func TestStateAddTransformEvent(t *testing.T) { + runID := NewID() + states := []struct { + e event.Event + r *State + }{ + { + event.Event{Type: event.ETTransformStart, Timestamp: 1609460600090, SessionID: runID, Payload: event.TransformLifecycle{StepCount: 4, Status: "running"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning}, + }, + { + event.Event{Type: event.ETTransformStepStart, Timestamp: 1609460700090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "setup"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), Status: RSRunning}, + }}, + }, + // { + // event.Event{ Type: event.ETVersionPulled, Timestamp: 1609460800090, SessionID: runID, Payload: {"refstring": "rico/presidents@QmFoo", "remote": "https://registy.qri.cloud" }}, + // &State{}, + // }, + { + event.Event{Type: event.ETTransformStepStop, Timestamp: 1609460900090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "setup", Status: "succeeded"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + }}, + }, + { + event.Event{Type: event.ETTransformStepStart, Timestamp: 1609461000090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "download"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), Status: RSRunning}, + }}, + }, + { + event.Event{Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), Status: RSRunning, Output: []event.Event{ + {Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + }}, + }}, + }, + // { + // event.Event{ Type: event.ETHttpRequestStart, Timestamp: 1609461200090, SessionID: runID, Payload: {"id": runID, "downloadSize": 230409, "method": "Gevent.ET", "url": "https://registy.qri.cloud" }}, + // &State{}, + // { + // { + // event.Event{ Type: event.ETHttpRequestStop, Timestamp: 1609461300090, SessionID: runID, Payload: {"size": 230409, "method": "Gevent.ET", "url": "https://registy.qri.cloud" }}, + // &State{}, + // }, + { + event.Event{Type: event.ETTransformStepStop, Timestamp: 1609461400090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "download", Status: "succeeded"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{ + {Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + }}, + }}, + }, + { + event.Event{Type: event.ETTransformStepStart, Timestamp: 1609461500090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "transform"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{ + {Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + }}, + {Name: "transform", StartTime: toTimePointer(1609461500090), Status: RSRunning}, + }}, + }, + { + event.Event{Type: event.ETTransformStepStop, Timestamp: 1609461600090, SessionID: runID, Payload: event.TransformStepLifecycle{Name: "transform", Status: "succeeded"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), Status: RSRunning, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{ + {Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + }}, + {Name: "transform", StartTime: toTimePointer(1609461500090), StopTime: toTimePointer(1609461600090), Duration: 100000000000000, Status: RSSucceeded}, + }}, + }, + { + event.Event{Type: event.ETTransformStop, Timestamp: 1609461900090, SessionID: runID, Payload: event.TransformLifecycle{Status: "failed"}}, + &State{ID: runID, StartTime: toTimePointer(1609460600090), StopTime: toTimePointer(1609461900090), Duration: 1300000000000000, Status: RSFailed, Steps: []*StepState{ + {Name: "setup", StartTime: toTimePointer(1609460700090), StopTime: toTimePointer(1609460900090), Duration: 200000000000000, Status: RSSucceeded}, + {Name: "download", StartTime: toTimePointer(1609461000090), StopTime: toTimePointer(1609461400090), Duration: 400000000000000, Status: RSSucceeded, Output: []event.Event{ + {Type: event.ETTransformPrint, Timestamp: 1609461100090, SessionID: runID, Payload: event.TransformMessage{Msg: "oh hai there"}}, + }}, + {Name: "transform", StartTime: toTimePointer(1609461500090), StopTime: toTimePointer(1609461600090), Duration: 100000000000000, Status: RSSucceeded}, + }}, + }, + } + + for i, s := range states { + t.Run(fmt.Sprintf("after_event_%d", i), func(t *testing.T) { + got := NewState(runID) + for j := 0; j <= i; j++ { + if err := got.AddTransformEvent(states[j].e); err != nil { + t.Fatal(err) + } + } + + if diff := cmp.Diff(s.r, got); diff != "" { + t.Errorf("result mismatch. (-want +got):\n%s", diff) + } + }) + } +}