Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#29772][prism] Handle EventTime Timers. #29900

Merged
merged 6 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdks/go/pkg/beam/core/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type TimerMap struct {

type timerConfig struct {
Tag string
HoldSet bool
HoldTimestamp mtime.Time
}

Expand All @@ -68,6 +69,7 @@ func WithTag(tag string) timerOptions {
// WithOutputTimestamp sets the output timestamp for the timer.
func WithOutputTimestamp(outputTimestamp time.Time) timerOptions {
return func(tm *timerConfig) {
tm.HoldSet = true
tm.HoldTimestamp = mtime.FromTime(outputTimestamp)
}
}
Expand Down Expand Up @@ -108,7 +110,7 @@ func (et EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOpti
opt(&tc)
}
tm := TimerMap{Family: et.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}
p.Set(tm)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (pt ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...time
opt(&tc)
}
tm := TimerMap{Family: pt.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reade
}
}

// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the given coder.
// pullDecoderNoAlloc returns a function that decodes a single element of the given coder.
// Intended to only be used as an internal function for pullDecoder, which will use a io.TeeReader
// to extract the bytes.
func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) {
Expand Down
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type TentativeData struct {

// state is a map from transformID + UserStateID, to window, to userKey, to datavalues.
state map[LinkID]map[typex.Window]map[string]StateData
// timers is a map from transformID + UserStateID, to window, to userKey, to datavalues.
timers map[LinkID][][]byte
}

// WriteData adds data to a given global collectionID.
Expand All @@ -49,6 +51,15 @@ func (d *TentativeData) WriteData(colID string, data []byte) {
d.Raw[colID] = append(d.Raw[colID], data)
}

// WriteTimers adds timers to the associated transform handler.
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte) {
if d.timers == nil {
d.timers = map[LinkID][][]byte{}
}
link := LinkID{Transform: transformID, Local: familyID}
d.timers[link] = append(d.timers[link], timers)
}

func (d *TentativeData) toWindow(wKey []byte) typex.Window {
if len(wKey) == 0 {
return window.GlobalWindow{}
Expand Down
Loading