Skip to content

Commit

Permalink
Merge pull request #60 from tstromberg/persist-scale
Browse files Browse the repository at this point in the history
Separate persist loop from content update loop
  • Loading branch information
tstromberg authored May 5, 2020
2 parents e0c7be2 + 7ac6be4 commit 8ee4505
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func main() {

if *dryRun {
klog.Infof("Updating ...")
if err := u.RunOnce(ctx, true); err != nil {
if _, err := u.RunOnce(ctx, true); err != nil {
klog.Exitf("run failed: %v", err)
}
os.Exit(0)
Expand Down
5 changes: 0 additions & 5 deletions pkg/persist/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,8 @@ func (d *Disk) GetNewerThan(key string, t time.Time) *Thing {
}

func (d *Disk) Save() error {
start := time.Now()
items := d.cache.Items()

klog.Infof("*** Saving %d items to disk cache at %s", len(items), d.path)
defer func() {
klog.Infof("*** disk.Save took %s", time.Since(start))
}()

b := new(bytes.Buffer)
ge := gob.NewEncoder(b)
Expand Down
4 changes: 0 additions & 4 deletions pkg/persist/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,7 @@ func (m *MySQL) GetNewerThan(key string, t time.Time) *Thing {
func (m *MySQL) Save() error {
start := time.Now()
items := m.cache.Items()

klog.Infof("*** Saving %d items to MySQL", len(items))
defer func() {
klog.Infof("*** mysql.Save took %s", time.Since(start))
}()

for k, v := range items {
b := new(bytes.Buffer)
Expand Down
91 changes: 69 additions & 22 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Updater struct {
loopEvery time.Duration
mutex *sync.Mutex
persistFunc PFunc
persistStart time.Time
updateCycles int
}

// recordAccess records stats on collection accesses
Expand Down Expand Up @@ -192,8 +194,9 @@ func (u *Updater) secondLastRequested(id string) time.Time {

func (u *Updater) update(ctx context.Context, s triage.Collection) error {
cutoff := time.Now().Add(minFlushAge * -1)
if u.lastPersist.IsZero() {
klog.Infof("have not yet saved content - will accept stale results")

if u.updateCycles == 0 {
klog.Infof("have not yet completed a cycle - will accept stale results")
cutoff = time.Time{}
}

Expand Down Expand Up @@ -233,7 +236,19 @@ func (u *Updater) RunSingle(ctx context.Context, id string, force bool) (bool, e

// Persist saves results to the persistence layer
func (u *Updater) Persist() error {
u.lastPersist = time.Now()
if !u.persistStart.IsZero() {
return fmt.Errorf("already persisting!")
}

// advisory lock
u.persistStart = time.Now()
klog.Infof("*** Started to persist ...")

defer func() {
klog.Infof("*** Persist complete! Took %s", time.Since(u.persistStart))
u.persistStart = time.Time{}
u.lastPersist = time.Now()
}()

if err := u.persistFunc(); err != nil {
return err
Expand All @@ -242,21 +257,55 @@ func (u *Updater) Persist() error {
return nil
}

func (u *Updater) shouldPersist(updated bool) bool {
if !u.persistStart.IsZero() {
if updated {
klog.Infof("still persisting (%s)...", time.Since(u.persistStart))
}
return false
}

sinceSave := time.Since(u.lastPersist)

// Avoid write contention by fuzzing
fuzz := time.Duration(rand.Intn(int(u.maxRefresh.Seconds()))) * time.Second
cutoff := u.maxRefresh + fuzz
if updated && sinceSave > cutoff {
klog.Infof("New data, and %s since cache has been saved (cutoff=%s)", cutoff, sinceSave)
return true
}

// Fallback for a very quiet repository, or bug that keeps us from realizing an update has occurred
cutoff = (u.maxRefresh * 4) + fuzz
if sinceSave > cutoff {
klog.Warningf("No new data, but %s since cache has been saved (cutoff=%s)", cutoff, sinceSave)
return true
}

return false
}

// Run once, optionally forcing an update
func (u *Updater) RunOnce(ctx context.Context, force bool) error {
func (u *Updater) RunOnce(ctx context.Context, force bool) (bool, error) {
updated := false
start := time.Now()

defer func() {
u.lastRun = time.Now()
if updated {
klog.Infof("update cycle #%d took %s", u.updateCycles, time.Since(start))
u.updateCycles++
}
}()

updated := false
if force {
klog.Warningf(">>> RunOnce has force enabled")
} else {
klog.V(3).Infof("RunOnce: force=%v", force)
}

sts, err := u.party.ListCollections()
if err != nil {
return err
return updated, err
}

if u.lastRun.IsZero() {
Expand All @@ -276,23 +325,11 @@ func (u *Updater) RunOnce(ctx context.Context, force bool) error {
}
}

cutoff := u.maxRefresh + time.Duration(rand.Intn(int(u.maxRefresh.Seconds())))*time.Second
sinceSave := time.Since(u.lastPersist)

if updated && sinceSave > cutoff {
klog.Infof("%s since cache has been saved (cutoff=%s)", cutoff, sinceSave)
go func() {
if err := u.Persist(); err != nil {
klog.Errorf("persist failed: %v", err)
}
}()
}

if len(failed) > 0 {
return fmt.Errorf("collections failed: %v", failed)
return updated, fmt.Errorf("collections failed: %v", failed)
}

return nil
return updated, nil
}

// Update loop
Expand All @@ -302,10 +339,20 @@ func (u *Updater) Loop(ctx context.Context) error {
ticker := time.NewTicker(u.loopEvery)
defer ticker.Stop()
for range ticker.C {
err := u.RunOnce(ctx, false)
updated, err := u.RunOnce(ctx, false)
if err != nil {
klog.Errorf("err: %v", err)
}

u.lastRun = time.Now()

if u.shouldPersist(updated) {
go func() {
if err := u.Persist(); err != nil {
klog.Errorf("persist failed: %v", err)
}
}()
}
}
return nil
}

0 comments on commit 8ee4505

Please sign in to comment.